首頁 > 軟體

Java多執行緒工具CompletableFuture的使用教學

2022-08-19 18:00:34

前言

Future的問題

寫多執行緒程式的時候,可以使用Future從一個非同步執行緒中拿到結果,但是如果使用過程中會發現一些問題:

  • 如果想要對Future的結果做進一步的操作,需要阻塞當前執行緒
  • 多個Future不能被鏈式的執行,每個Future的結果都是獨立的,期望對一個Future的結果做另外一件非同步的事情;
  • 沒有例外處理策略,如果Future執行失敗了,需要手動捕捉

CompletableFuture應運而生

為了解決Future問題,JDK在1.8的時候給我們提供了一個好用的工具類CompletableFuture;

它實現了Future和CompletionStage介面,針對Future的不足之處給出了相應的處理方式。

  • 在非同步執行緒執行結束後可以自動回撥我們新的處理邏輯,無需阻塞
  • 可以對多個非同步任務進行編排,組合或者排序
  • 例外處理

CompletableFuture的核心思想是將每個非同步任務都可以看做一個步驟(CompletionStage),然後其他的非同步任務可以根據這個步驟做一些想做的事情。

CompletionStage定義了許多步驟處理的方法,功能非常強大,這裡就只列一下日常中常用到的一些方法供大家參考。

使用方式

基本使用-提交非同步任務

簡單的使用方式

非同步執行,無需結果:

// 可以執行Executors非同步執行,如果不指定,預設使用ForkJoinPool
CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

非同步執行,同時返回結果:

// 同樣可以指定執行緒池
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
System.out.println(stringCompletableFuture.get());

處理上個非同步任務結果

  • thenRun: 不需要上一步的結果,直接直接新的操作

  • thenAccept:獲取上一步非同步處理的內容,進行新的操作

  • thenApply: 獲取上一步的內容,然後產生新的內容

所有加上Async字尾的,代表新的處理操作仍然是非同步的。Async的操作都可以指定Executors進行處理

// Demo
       CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                // 針對上一步的結果做處理,產生新的結果
                .thenApplyAsync(s -> s.toUpperCase())
                // 針對上一步的結果做處理,不返回結果
                .thenAcceptAsync(s -> System.out.println(s))
                // 不需要上一步返回的結果,直接進行操作
                .thenRunAsync(() -> System.out.println("end"));
        ;

對兩個結果進行選用-acceptEither

當我們有兩個回撥在處理的時候,任何完成都可以使用,兩者結果沒有關係,那麼使用acceptEither。

兩個非同步執行緒誰先執行完成,用誰的結果,其餘型別的方法也是如此。

// 返回abc
CompletableFuture
                .supplyAsync(() -> {
                    SleepUtils.sleep(100);
                    return "Hello CompletableFuture!";
                })
                .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });
// 返回Hello CompletableFuture!       
CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                .acceptEither(CompletableFuture.supplyAsync(() -> {
                    SleepUtils.sleep(100);
                    return "abc";
                }), new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });

對兩個結果進行合併-thenCombine, thenAcceptBoth

thenCombine

當我們有兩個CompletionStage時,需要對兩個的結果進行整合處理,然後計算得出一個新的結果。

  • thenCompose是對上一個CompletionStage的結果進行處理,返回結果,並且返回型別必須是CompletionStage。
  • thenCombine是得到第一個CompletionStage的結果,然後拿到當前的CompletionStage,兩者的結果進行處理。
        CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);

        CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                    @Override
                    public Double apply(Integer wight, Integer height) {
                        return wight * 10000.0 / (height * height);
                    }
                })
                ;

thenAcceptBoth

需要兩個非同步CompletableFuture的結果,兩者都完成的時候,才進入thenAcceptBoth回撥。

// thenAcceptBoth案例:
        CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                		// 引數一為我們剛開始執行時的CompletableStage,新傳入的作為第二個引數
                    @Override
                    public void accept(String s, String s2) {
                        System.out.println("param1=" + s + ", param2=" + s2);
                    }
                });
// 結果:param1=Hello CompletableFuture!, param2=abc

例外處理

當我們使用CompleteFuture進行鏈式呼叫的時候,多個非同步回撥中,如果有一個執行出現問題,那麼接下來的回撥都會停止,所以需要一種例外處理策略。

exceptionally

exceptionally是當出現錯誤時,給我們機會進行恢復,自定義返回內容。

        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("發生錯誤");
        }).exceptionally(throwable -> {
            log.error("呼叫錯誤 {}", throwable.getMessage(), throwable);
            return "例外處理內容";
        });

handle

exceptionally是隻有發生異常時才會執行,而handle則是不管是否發生錯誤都會執行。

CompletableFuture.supplyAsync(() -> {
    return "abc";
})
.handle((r,err) -> {
    log.error("呼叫錯誤 {}", err.getMessage(), err);
    // 對結果做額外的處理
    return r;
})
;

案例

大量使用者傳送簡訊|訊息

需求為對某個表中特定條件的使用者進行簡訊通知,但是簡訊使用者有成百上千萬,如果使用單執行緒讀取效率會很慢。這個時候可以考慮使用多執行緒的方式進行讀取;

1、將讀取任務拆分為多個不同的子任務,指定讀取的偏移量和個數

  // 假設有500萬條記錄
        long recordCount = 500 * 10000;
        int subTaskRecordCount = 10000;
        // 對記錄進行分片
        List<Map> subTaskList = new LinkedList<>();
        for (int i = 0; i < recordCount / 500; i++) {
            // 如果子任務結構複雜,建議使用物件
            HashMap<String, Integer> subTask = new HashMap<>();
            subTask.put("index", i);
            subTask.put("offset", i * subTaskRecordCount);
            subTask.put("count", subTaskRecordCount);
            subTaskList.add(subTask);
        }

2、使用多執行緒進行批次讀取

  // 進行subTask批次處理,拆分為不同的任務
        subTaskList.stream()
                .map(subTask -> CompletableFuture.runAsync(()->{
                    // 讀取資料,然後處理
                    // dataTunel.read(subTask);
                },excuturs))   // 使用應用的通用任務執行緒池
                .map(c -> ((CompletableFuture<?>) c).join());

3、進行業務邏輯處理,或者直接在讀取完進行業務邏輯處理也是可以;

並行獲取商品不同資訊

在系統拆分比較細的時候,價格,優惠券,庫存,商品詳情等資訊分散在不同的系統中,有時候需要同時獲取商品的所有資訊, 有時候可能只需要獲取商品的部分資訊。

當然問題點在於要呼叫多個不同的系統,需要將RT降低下來,那麼需要進行並行呼叫;

     List<Task> taskList = new ArrayList<>();
        List<Object> result = taskList.stream()
                .map(task -> CompletableFuture.supplyAsync(()->{
//                    handlerMap.get(task).query();
                    return "";
                }, executorService))
                .map(c -> c.join())
                .collect(Collectors.toList());

問題

thenRun和thenRunAsync有什麼區別

  • 如果不使用傳入的執行緒池,大家用預設的執行緒池ForkJoinPool
  • thenRun用的預設和上一個任務使用相同的執行緒池
  • thenRunAsync在執行新的任務的時候可以接受傳入一個新的執行緒池,使用新的執行緒池執行任務;

handle和exceptional有什麼區別

exceptionally是隻有發生異常時才會執行,而handle則是不管是否發生錯誤都會執行。

最後

一般情況下上述簡單的API已經滿足絕大部分的場景了,如果有更復雜的訴求,可繼續深入研究。

到此這篇關於Java多執行緒工具CompletableFuture的使用教學的文章就介紹到這了,更多相關Java CompletableFuture內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com