首頁 > 軟體

Java多執行緒並行FutureTask使用詳解

2022-06-28 18:05:47

本文基於最新的 OpenJDK 程式碼,預計發行版本為 19 。

Java 的多執行緒機制本質上能夠完成兩件事情,非同步計算和並行。並行問題通過解決執行緒安全的一系列 API 來解決;而非同步計算,常見的使用是 Runnable 和 Callable 配合執行緒使用。

FutureTask 是基於 Runnable 實現的一個可取消的非同步呼叫 API 。

基本使用

  • Future 代表了非同步計算的結果,通過 ExecutorService 的 Future<?> submit(Runnable task) 方法,作為返回值使用:
interface ArchiveSearcher { String search(String target); }
class App {
		ExecutorService executor = ...;
    ArchiveSearcher searcher = ...;
    void showSearch(String target) throws InterruptedException {
       Callable<String> task = () -> searcher.search(target);
       Future<String> future = executor.submit(task); // 獲取執行結果
       displayOtherThings(); // do other things while searching
       try {
         displayText(future.get()); // use future
       } catch (ExecutionException ex) { cleanup(); return; }
    }
}
  • FutureTask類是實現了Runnable的Future的實現,因此可以由Executor執行。例如,上述帶有submit的構造可以替換為:
class App {
		ExecutorService executor = ...;
    ArchiveSearcher searcher = ...;
    void showSearch(String target) throws InterruptedException {
       Callable<String> task = () -> searcher.search(target);
       // 關鍵兩行替換
       FutureTask<String> future = new FutureTask<>(task);
			 executor.execute(future);
       displayOtherThings(); // do other things while searching
       try {
         displayText(future.get()); // use future
       } catch (ExecutionException ex) { cleanup(); return; }
    }
}

程式碼分析

繼承關係

Future

Future 表示非同步計算的結果。定義了用於檢查計算是否完成、等待計算完成以及檢索計算結果的能力。只有在計算完成後,才能使用 get 方法檢索結果,在必要時會阻塞執行緒直到 Future 計算完成。取消是由 cancel 方法執行的。還提供了其他方法來確定任務是正常完成還是被取消。一旦計算完成,就不能取消計算。如果為了可取消性而使用 Future ,但又不想提供一個可用的結果,你可以宣告形式 Future<?> 並返回 null 作為任務的結果。

在介紹 Future 中定義的能力之前,先了解一下它的用來表示 Future 狀態內部類,和狀態檢索方法:

public interface Future<V> {
    enum State {
       	// The task has not completed.
        RUNNING,
        // The task completed with a result. @see Future#resultNow()
        SUCCESS,
        //The task completed with an exception. @see Future#exceptionNow()
        FAILED,
        // The task was cancelled. @see #cancel(boolean)
        CANCELLED
    }

    default State state() {
        if (!isDone())		// 根據 isDone() 判斷執行中
            return State.RUNNING;
        if (isCancelled()) // 根據 isCancelled() 判斷已取消
            return State.CANCELLED;
        boolean interrupted = false;
        try {
            while (true) { // 死迴圈輪詢
                try {
                    get();  // may throw InterruptedException when done
                    return State.SUCCESS; 
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    return State.FAILED;
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
}

Future 的狀態檢索的預設實現是根據 isDone()isCancelled() 和不斷輪詢 get() 方法獲取到的返回值判斷的。

get() 正常返回結果時, state() 返回 State.SUCCESS ; 當丟擲 InterruptedException 時,最終會操作所在的執行緒執行嘗試中斷的方法;丟擲其他異常時,則返回 State.FAILED

Future 中定義的其他方法包括:

package java.util.concurrent;
public interface Future<V> {
		// 取消操作
    boolean cancel(boolean mayInterruptIfRunning);
		// 檢查是否取消
    boolean isCancelled();
		// 檢查是否完成
    boolean isDone();
		// 獲取計算結果的方法
    V get() throws InterruptedException, ExecutionException;
		// 帶有超時限制的獲取計算結果的方法
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  	// 立刻返回結果
  	default V resultNow()
  	// 立刻丟擲異常
  	default Throwable exceptionNow()
}

其中 resultNow()exceptionNow() 是帶有預設實現的:

		default V resultNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return get();
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    throw new IllegalStateException("Task completed with exception");
                } catch (CancellationException e) {
                    throw new IllegalStateException("Task was cancelled");
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
  • Future 仍在執行中,直接丟擲 IllegalStateException 。
  • 執行一個輪詢,呼叫 get() 嘗試返回計算結果,如果 get() 丟擲異常,則根據異常丟擲不同訊息的 IllegalStateException 或執行中斷執行緒的操作。
    default Throwable exceptionNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        if (isCancelled())
            throw new IllegalStateException("Task was cancelled");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    get();
                    throw new IllegalStateException("Task completed with a result");
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    return e.getCause();
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
  • Future 仍在執行中,直接丟擲 IllegalStateException 。
  • Future 檢查是否已取消,如果取消了丟擲 IllegalStateException 。
  • 執行輪詢,呼叫 get() 方法,如果能夠正常執行結束,也丟擲 IllegalStateException ,訊息是 "Task completed with a result" ;get() 若丟擲 InterruptedException ,則執行執行緒中斷操作;其他異常正常丟擲。

這就是 Future 的全貌了。

RunnableFuture

RunnableFuture 介面同時實現了 Runnable 和 Future 介面 :

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     * 除非已取消,否則將此Future設定為其計算的結果。
     */
    void run();
}

Runnable 介面是我們常用的用來實現執行緒操作的,可以說是十分熟悉也十分簡單了。

這個介面代表了一個可以 Runnable 的 Future ,run 方法的成功執行代表著 Future 執行完成,並可以獲取它的計算結果。

這個介面是 JDK 1.6 後續才有的。

FutureTask

FutureTask 是 RunnableFuture 的直接實現類,它代表了一個可取消的非同步計算任務。根據我們對 Future 的分析和 Runnable 的熟悉,就可以理解它的作用了:可取消並可以檢索執行狀態的一個 Runnable ,配合執行緒使用可以中斷執行緒執行。當任務沒有執行完成時會造成阻塞。並且它還可以配合 Executor 使用。

狀態

FutureTask 內部也定義了自己的狀態:

public class FutureTask<V> implements RunnableFuture<V> {
		private volatile int state;
    private static final int NEW          = 0; // 新建
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常完成
    private static final int EXCEPTIONAL  = 3; // 異常的
    private static final int CANCELLED    = 4; // 已取消
    private static final int INTERRUPTING = 5; // 中斷中
    private static final int INTERRUPTED  = 6; // 已中斷
  
		@Override
    public State state() {
        int s = state;
        while (s == COMPLETING) {
            // 等待過渡到 NORMAL 或 EXCEPTIONAL
            Thread.yield();
            s = state;
        }
        switch (s) {
            case NORMAL:
                return State.SUCCESS;
            case EXCEPTIONAL:
                return State.FAILED;
            case CANCELLED:
            case INTERRUPTING:
            case INTERRUPTED:
                return State.CANCELLED;
            default:
                return State.RUNNING;
        }
    }
}

FutureTask 的狀態包括 7 種,最初為 NEW ,只有在 set、setException 和 cancel 方法中,執行狀態才會轉換為最終狀態。在完成期間,狀態可能為 COMPLETING (當結果正在設定時) 或 INTERRUPTING (僅當中斷跑者以滿足cancel(true) )的瞬態值。

可能存在的狀態轉換是:

NEW -> COMPLETING -> NORMAL // 正常完成
NEW -> COMPLETING -> EXCEPTIONAL // 丟擲異常
NEW -> CANCELLED // 取消
NEW -> INTERRUPTING -> INTERRUPTED // 中斷

屬性

下面分析一下它的屬性:

    /** 底層的呼叫;執行後為null */
    private Callable<V> callable;
    /** get()返回的結果或丟擲的異常 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** 等待執行緒的 Treiber 堆疊 */
    private volatile WaitNode waiters;

內部類

先看一看這個 WaitNode ,這是一個 FutureTask 的內部類:

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

一個連結串列結構,用來對等待執行緒進行排序。

構造方法

最後是方法的分析,首先是構造方法:

    // Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion. 
     * Runnable 成功是返回給定的結果 result
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask 接收一個 Callable 或一個 Runnable 作為引數,Runnable 會封裝一下都儲存到屬性 callable ,然後更新 FutureTask 的狀態為 NEW

從 Future 介面中實現的方法逐個分析:

檢索 FutureTask 狀態

    public boolean isCancelled() {
        return state >= CANCELLED; // 大於等於 4, 已取消、中斷中、已中斷
    }

    public boolean isDone() {
        return state != NEW; // 不是 new 就代表執行結束了
    }

取消操作

    // mayInterruptIfRunning 表示最終的取消是通過中斷還是通過取消。
		public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  // 嘗試設定 CANCELLED 或 INTERRUPTING 
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt(); // 通過中斷取消任務
                } finally { // final state
                    STATE.setRelease(this, INTERRUPTED); // 更新中斷狀態
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

這裡的 finishCompletion() 的作用是通過 LockSupport 喚醒等待的全部執行緒並從等待列表中移除,然後呼叫done(),最後把 callable 置空。相當於取消成功後釋放資源的操作。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }

done() 是個空實現,供子類去自定義的。

protected void done() { }

計算結果

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L); // 非同步結果
        return report(s);
    }
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

這裡涉及兩個方法:awaitDone 方法和 report 方法 。

awaitDone 方法:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - if nanos <= 0L, 及時返回,不需要 allocation 或 nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {  // COMPLETING = 1
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 瞬時態,完成中
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q); // 執行緒中斷,移除等待的執行緒
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) { // 設定超時時間的情況
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

通過 CAS 和 LockSupport 的掛起/喚醒操作配合,阻塞當前執行緒,非同步地等待計算結果。

這裡有個 removeWaiter 方法,內部就是遍歷 waiters ,刪除超時和中斷的等待執行緒。

當非同步邏輯執行完成後,呼叫 report 方法:

    // 為完成的任務返回結果或丟擲異常
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

這裡用到了一個 outcome ,它是一個 Object 型別,作為返回結果,通過 set 方法可以對它進行設定:

    // 除非該 future 已被設定或取消,否則將該 future 的結果設定為給定值。
		// 該方法在成功完成計算後由 run 方法在內部呼叫。
		protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = v;
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }

立刻獲取結果或異常

這兩個方法都是通過 outcome 預設的返回值,返回預期的結果或異常。

    public V resultNow() {
        switch (state()) {    // Future.State
            case SUCCESS:
                @SuppressWarnings("unchecked")
                V result = (V) outcome;
                return result;
            case FAILED:
                throw new IllegalStateException("Task completed with exception");
            case CANCELLED:
                throw new IllegalStateException("Task was cancelled");
            default:
                throw new IllegalStateException("Task has not completed");
        }
    }

    @Override
    public Throwable exceptionNow() {
        switch (state()) {    // Future.State
            case SUCCESS:
                throw new IllegalStateException("Task completed with a result");
            case FAILED:
                Object x = outcome;
                return (Throwable) x;
            case CANCELLED:
                throw new IllegalStateException("Task was cancelled");
            default:
                throw new IllegalStateException("Task has not completed");
        }
    }

run 方法組

最後是實現了 Runnable 的 run 方法:

    public void run() {
      	// 保證 NEW 狀態和 RUNNER 成功設定當前執行緒
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable; // 待執行的 Callable
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); // 執行 Callable 
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 為了防止並行呼叫 run ,直到 state 確定之前, runner 必須是非空的
            runner = null;
            // 狀態必須在 runner 置空後重新讀取,以防止洩漏中斷
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

這裡涉及兩個方法,第一個是 setException(ex) :

    // 導致該future報告一個{@link ExecutionException},並將給定的可丟擲物件作為其原因,除非該future已經被設定或取消。
    protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = t;
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

另一個是 handlePossibleCancellationInterrupt 方法:

    /**
     * 確保任何來自可能的 cancel(true) 的中斷只在 run 或 runAndReset 時交付給任務。
     */
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;
        // 我們想清除可能從cancel(true)接收到的所有中斷。
        // 然而,允許使用中斷作為任務與其呼叫者通訊的獨立機制,並沒有辦法只清除取消中斷。
        // Thread.interrupted();
    }

最後是 runAndReset 方法:

    protected boolean runAndReset() {
        if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return false;
        boolean ran = false; // flag 表示正常執行結束
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state; // 
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW; // 當正常執行結束,且 state 一開始就是 NEW 時,表示可以執行並重置。
    }

執行計算時不設定其結果,然後將該 future 重置為初始狀態,如果計算遇到異常或被取消,則不這樣做。這是為本質上執行多次的任務設計的。

run 和 runAndReset 都用到了一個 RUNNER , 最後就來揭祕一下這個東西:

    private static final VarHandle STATE;
    private static final VarHandle RUNNER;
    private static final VarHandle WAITERS;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(FutureTask.class, "state", int.class);
            RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
            WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }

        // Reduce the risk of rare disastrous classloading in first call to
        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
        Class<?> ensureLoaded = LockSupport.class;
    }

MethodHandles.lookup()建立一個MethodHandles.Lookup物件,該物件可以建立所有存取許可權的方法,包括publicprotectedprivatedefault

VarHandle 主要用於動態運算元組的元素或物件的成員變數VarHandle通過 MethodHandles 來獲取範例,然後呼叫 VarHandle 的方法即可動態操作指定陣列的元素或指定物件的成員變數。

到此這篇關於Java 多執行緒並行FutureTask的文章就介紹到這了,更多相關Java 多執行緒並行內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com