首頁 > 軟體

CompletableFuture 非同步編排範例詳解

2022-09-04 18:04:01

從Future聊起

Future是java 1.5引入的非同步程式設計api,它表示一個非同步計算結果,提供了獲取非同步結果的能力,解決了多執行緒場景下Runnable執行緒任務無法獲取結果的問題。

但是其獲取非同步結果的方式並不夠優雅,我們必須使用Future.get的方式阻塞呼叫執行緒,或者使用輪詢方式判斷 Future.isDone 任務是否結束,再獲取結果。

public interface Future<V> {

    //任務是否完成
    boolean isDone();
    
    //阻塞呼叫執行緒獲取非同步結果
    V get() throws InterruptedException, ExecutionException;

   //在指定時間內阻塞執行緒獲取非同步結果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

假如存在多個非同步任務相互依賴,一個或多個非同步執行緒任務需要依賴上一個非同步執行緒任務結果,並且多個非同步任務能夠組合結果,顯然這種阻塞執行緒的方式並不能優雅解決。

我們更希望能夠提供一種非同步回撥的方式,組合各種非同步任務,而無需開發者對多個非同步任務結果的監聽編排。

為了解決優化上述問題,java8 新增了CompletableFutureAPI ,其大大擴充套件了Future能力,並提供了非同步任務編排能力。

CompletableFuture

CompletableFuture實現了新的介面CompletionStage,並擴充套件了Future介面。檢視類圖

建立非同步任務

CompletableFuture 提供了四種方法去建立一個非同步任務。

  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):建立一個有返回值的非同步任務範例
  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor):建立一個有返回值的非同步任務範例,可以指定執行緒池
  • static CompletableFuture<Void> runAsync(Runnable runnable):建立一個無返回值的任務範例
  • static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor):建立一個無返回值的任務範例,允許指定執行緒池

著幾個方法本質上是有返回值和無返回值兩種型別方法,supply方法可以獲取非同步結果,而run方法則無返回值,根據需要使用。

同時兩種型別的方法均提供了指定執行緒池的過載,如果不指定執行緒池會預設使用ForkJoinPool.commonPool(),預設執行緒數為cpu核心數,建議使用自定義執行緒池的方式,避免執行緒資源競爭

一個簡單樣例

        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println("無返回值任務"); });
        runAsync.get();
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "hello completableFuture");
        String result = supplyAsync.get();
        System.out.println(result);

我們依然可以通過get()方法阻塞獲取非同步結果任務,但是CompletableFuture主要還是用於非同步回撥及非同步任務編排使用。

非同步回撥

在任務執行結束後我們希望能夠自動觸發回撥方法,CompletableFuture提供了兩種方法實現。

  • CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action):當上一階段任務執行結束後,回撥方法接受上一階段結果或者異常,返回上一階段任務結果
  • <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn):當上一階段任務執行結束後,回撥方法接受上一階段結果或者異常,並最終返回回撥方法處理結果
  • CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn):上一階段任務出現異常後的回撥,返回結果是回撥函數的返回結果。

whenComplete 與 handle 區別:兩者均接受上一階段任務結果或異常,但是whenComplete 回撥中沒有返回值,所以其結果是上一階段任務,而handle 最終返回的是其回撥方法方法,其主要是BiConsumerBiFunction的區別。

非同步編排

CompletionStage表示非同步計算的一個階段,當一個計算處理完成後會觸發其他依賴的階段。當然一個階段的觸發也可以是由多個階段的完成觸發或者多箇中的任意一個完成觸發。該介面定義了非同步任務編排的各種場景,CompletableFuture則實現了這些場景。

可以把這些場景大致分為三類:序列、AND和OR。下面會逐個分析各個場景,介面中定義的以Async結尾的方法,指下一階段任務會被單獨提交到執行緒池中執行,後面不在贅述。

序列

當上一階段任務執行完畢後,繼續提交執行其他任務

  • <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn):接收上一階段任務結果,並可獲取返回值。
  • CompletableFuture<Void> thenAccept(Consumer<? super T> action):接收上一階段任務結果,無返回值。
  • CompletableFuture<Void> thenRun(Runnable action):不接收上一階段任務結果,並且無返回值。

T:上一個任務返回結果的型別 U:當前任務的返回值型別

AND

組合多個非同步任務,當多個任務執行完畢繼續執行其他任務

  • <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):上一階段任務與other任務均執行結束,接收兩個任務的結果,並可獲取返回值
  • <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn): 使用上一階段任務的結果,返回一個新的CompletableFuture範例
  • <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action):上一階段任務與other任務均執行結束,接收兩個任務的結果,無返回值
  • CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action):上一階段任務與other任務均執行結束,不接收兩個任務的結果,無返回值
  • static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs):等待所有非同步任務執行結束

T:上一個任務返回結果的型別 U:上一個other任務的返回值型別 V:當前任務返回值

OR

當多個任務中任意任務執行完成則繼續執行其他任務。

  • <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn): 接收上一階段任務與other任務最快執行完成的結果,並可獲取返回值
  • CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action):接收上一階段任務與other任務最快執行完成的結果,無返回值
  • CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action):上一階段任務與other任務任意任務完成執行,不接收結果,無返回值
  • static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs):組合多個任務,返回最快執行結束的任務結果

Future 機制擴充套件

CompletableFuture不僅實現了Future介面,同時對其進行了擴充套件,提供了更加優雅的實現。

  • T join() :與get()方法用法一致,阻塞呼叫執行緒獲取結果,但是不會丟擲具體異常,簡化了使用上下文
  • T getNow(T valueIfAbsent):當任務結束返回任務結果,否則返回給定的結果valueIfAbsent。
  • boolean complete(T value):當任務未結束時設定給定的結果value並結束任務,已結束的任務不會生效。
  • boolean completeExceptionally(Throwable ex):當任務未結束時設定異常結果並結束任務,已結束的任務不會生效

CompletableFuture 實踐

我們通過CompletableFuture實現一個經典的燒水程式。

我們可以把這個流程分為三個非同步任務。

任務1:洗水壺->燒水

任務2:洗水壺->洗茶杯->拿茶葉

任務3:泡茶,需要等待任務1與任務2結束。

通過程式碼模擬實現

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗水壺");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "水壺";
        }).thenApply(e->{
            System.out.println("燒水");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "熱水";
        });
        //洗水壺->洗水杯->拿茶葉
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗茶壺");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "茶壺";
        }).thenApply(e->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("洗水杯");
            return "水杯";
        }).thenApply(e->{
            System.out.println("拿茶葉");
            return "茶葉";
        });
        //泡茶
        CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {
            System.out.println("泡茶");
            return "茶";
        });
        String tea = task3.join();
        System.out.println(tea);

以上就是CompletableFuture 非同步編排範例詳解的詳細內容,更多關於CompletableFuture 非同步編排的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com