<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
本文基於最新的 OpenJDK 程式碼,預計發行版本為 19 。
Java 的多執行緒機制本質上能夠完成兩件事情,非同步計算和並行。並行問題通過解決執行緒安全的一系列 API 來解決;而非同步計算,常見的使用是 Runnable 和 Callable 配合執行緒使用。
FutureTask 是基於 Runnable 實現的一個可取消的非同步呼叫 API 。
Future<?> submit(Runnable task)
方法,作為返回值使用:interface ArchiveSearcher { String search(String target); } class App { ExecutorService executor = ...; ArchiveSearcher searcher = ...; void showSearch(String target) throws InterruptedException { Callable<String> task = () -> searcher.search(target); Future<String> future = executor.submit(task); // 獲取執行結果 displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
class App { ExecutorService executor = ...; ArchiveSearcher searcher = ...; void showSearch(String target) throws InterruptedException { Callable<String> task = () -> searcher.search(target); // 關鍵兩行替換 FutureTask<String> future = new FutureTask<>(task); executor.execute(future); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
Future 表示非同步計算的結果。定義了用於檢查計算是否完成、等待計算完成以及檢索計算結果的能力。只有在計算完成後,才能使用 get 方法檢索結果,在必要時會阻塞執行緒直到 Future 計算完成。取消是由 cancel 方法執行的。還提供了其他方法來確定任務是正常完成還是被取消。一旦計算完成,就不能取消計算。如果為了可取消性而使用 Future ,但又不想提供一個可用的結果,你可以宣告形式 Future<?>
並返回 null 作為任務的結果。
在介紹 Future 中定義的能力之前,先了解一下它的用來表示 Future 狀態內部類,和狀態檢索方法:
public interface Future<V> { enum State { // The task has not completed. RUNNING, // The task completed with a result. @see Future#resultNow() SUCCESS, //The task completed with an exception. @see Future#exceptionNow() FAILED, // The task was cancelled. @see #cancel(boolean) CANCELLED } default State state() { if (!isDone()) // 根據 isDone() 判斷執行中 return State.RUNNING; if (isCancelled()) // 根據 isCancelled() 判斷已取消 return State.CANCELLED; boolean interrupted = false; try { while (true) { // 死迴圈輪詢 try { get(); // may throw InterruptedException when done return State.SUCCESS; } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { return State.FAILED; } } } finally { if (interrupted) Thread.currentThread().interrupt(); } } }
Future 的狀態檢索的預設實現是根據 isDone()
、isCancelled()
和不斷輪詢 get()
方法獲取到的返回值判斷的。
當 get()
正常返回結果時, state()
返回 State.SUCCESS
; 當丟擲 InterruptedException
時,最終會操作所在的執行緒執行嘗試中斷的方法;丟擲其他異常時,則返回 State.FAILED
。
Future 中定義的其他方法包括:
package java.util.concurrent; public interface Future<V> { // 取消操作 boolean cancel(boolean mayInterruptIfRunning); // 檢查是否取消 boolean isCancelled(); // 檢查是否完成 boolean isDone(); // 獲取計算結果的方法 V get() throws InterruptedException, ExecutionException; // 帶有超時限制的獲取計算結果的方法 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // 立刻返回結果 default V resultNow() // 立刻丟擲異常 default Throwable exceptionNow() }
其中 resultNow()
和 exceptionNow()
是帶有預設實現的:
default V resultNow() { if (!isDone()) throw new IllegalStateException("Task has not completed"); boolean interrupted = false; try { while (true) { try { return get(); } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { throw new IllegalStateException("Task completed with exception"); } catch (CancellationException e) { throw new IllegalStateException("Task was cancelled"); } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }
get()
嘗試返回計算結果,如果 get()
丟擲異常,則根據異常丟擲不同訊息的 IllegalStateException 或執行中斷執行緒的操作。default Throwable exceptionNow() { if (!isDone()) throw new IllegalStateException("Task has not completed"); if (isCancelled()) throw new IllegalStateException("Task was cancelled"); boolean interrupted = false; try { while (true) { try { get(); throw new IllegalStateException("Task completed with a result"); } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { return e.getCause(); } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }
get()
方法,如果能夠正常執行結束,也丟擲 IllegalStateException ,訊息是 "Task completed with a result" ;get()
若丟擲 InterruptedException ,則執行執行緒中斷操作;其他異常正常丟擲。這就是 Future 的全貌了。
RunnableFuture 介面同時實現了 Runnable 和 Future 介面 :
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. * 除非已取消,否則將此Future設定為其計算的結果。 */ void run(); }
Runnable 介面是我們常用的用來實現執行緒操作的,可以說是十分熟悉也十分簡單了。
這個介面代表了一個可以 Runnable 的 Future ,run 方法的成功執行代表著 Future 執行完成,並可以獲取它的計算結果。
這個介面是 JDK 1.6 後續才有的。
FutureTask 是 RunnableFuture 的直接實現類,它代表了一個可取消的非同步計算任務。根據我們對 Future 的分析和 Runnable 的熟悉,就可以理解它的作用了:可取消並可以檢索執行狀態的一個 Runnable ,配合執行緒使用可以中斷執行緒執行。當任務沒有執行完成時會造成阻塞。並且它還可以配合 Executor 使用。
FutureTask 內部也定義了自己的狀態:
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; // 新建 private static final int COMPLETING = 1; // 完成中 private static final int NORMAL = 2; // 正常完成 private static final int EXCEPTIONAL = 3; // 異常的 private static final int CANCELLED = 4; // 已取消 private static final int INTERRUPTING = 5; // 中斷中 private static final int INTERRUPTED = 6; // 已中斷 @Override public State state() { int s = state; while (s == COMPLETING) { // 等待過渡到 NORMAL 或 EXCEPTIONAL Thread.yield(); s = state; } switch (s) { case NORMAL: return State.SUCCESS; case EXCEPTIONAL: return State.FAILED; case CANCELLED: case INTERRUPTING: case INTERRUPTED: return State.CANCELLED; default: return State.RUNNING; } } }
FutureTask 的狀態包括 7 種,最初為 NEW
,只有在 set、setException 和 cancel 方法中,執行狀態才會轉換為最終狀態。在完成期間,狀態可能為 COMPLETING
(當結果正在設定時) 或 INTERRUPTING
(僅當中斷跑者以滿足cancel(true)
)的瞬態值。
可能存在的狀態轉換是:
NEW -> COMPLETING -> NORMAL // 正常完成 NEW -> COMPLETING -> EXCEPTIONAL // 丟擲異常 NEW -> CANCELLED // 取消 NEW -> INTERRUPTING -> INTERRUPTED // 中斷
下面分析一下它的屬性:
/** 底層的呼叫;執行後為null */ private Callable<V> callable; /** get()返回的結果或丟擲的異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** 等待執行緒的 Treiber 堆疊 */ private volatile WaitNode waiters;
先看一看這個 WaitNode ,這是一個 FutureTask 的內部類:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
一個連結串列結構,用來對等待執行緒進行排序。
最後是方法的分析,首先是構造方法:
// Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}. public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * Runnable 成功是返回給定的結果 result */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
FutureTask 接收一個 Callable 或一個 Runnable 作為引數,Runnable 會封裝一下都儲存到屬性 callable
,然後更新 FutureTask 的狀態為 NEW
。
從 Future 介面中實現的方法逐個分析:
public boolean isCancelled() { return state >= CANCELLED; // 大於等於 4, 已取消、中斷中、已中斷 } public boolean isDone() { return state != NEW; // 不是 new 就代表執行結束了 }
// mayInterruptIfRunning 表示最終的取消是通過中斷還是通過取消。 public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 嘗試設定 CANCELLED 或 INTERRUPTING return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); // 通過中斷取消任務 } finally { // final state STATE.setRelease(this, INTERRUPTED); // 更新中斷狀態 } } } finally { finishCompletion(); } return true; }
這裡的 finishCompletion()
的作用是通過 LockSupport 喚醒等待的全部執行緒並從等待列表中移除,然後呼叫done()
,最後把 callable 置空。相當於取消成功後釋放資源的操作。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
done()
是個空實現,供子類去自定義的。
protected void done() { }
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 非同步結果 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
這裡涉及兩個方法:awaitDone
方法和 report
方法 。
awaitDone 方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - if nanos <= 0L, 及時返回,不需要 allocation 或 nanoTime // - if nanos == Long.MIN_VALUE, don't underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { // COMPLETING = 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 瞬時態,完成中 // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); // 執行緒中斷,移除等待的執行緒 throw new InterruptedException(); } else if (q == null) { if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { // 設定超時時間的情況 final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else LockSupport.park(this); } }
通過 CAS 和 LockSupport 的掛起/喚醒操作配合,阻塞當前執行緒,非同步地等待計算結果。
這裡有個 removeWaiter
方法,內部就是遍歷 waiters
,刪除超時和中斷的等待執行緒。
當非同步邏輯執行完成後,呼叫 report 方法:
// 為完成的任務返回結果或丟擲異常 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
這裡用到了一個 outcome ,它是一個 Object 型別,作為返回結果,通過 set 方法可以對它進行設定:
// 除非該 future 已被設定或取消,否則將該 future 的結果設定為給定值。 // 該方法在成功完成計算後由 run 方法在內部呼叫。 protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }
這兩個方法都是通過 outcome
預設的返回值,返回預期的結果或異常。
public V resultNow() { switch (state()) { // Future.State case SUCCESS: @SuppressWarnings("unchecked") V result = (V) outcome; return result; case FAILED: throw new IllegalStateException("Task completed with exception"); case CANCELLED: throw new IllegalStateException("Task was cancelled"); default: throw new IllegalStateException("Task has not completed"); } } @Override public Throwable exceptionNow() { switch (state()) { // Future.State case SUCCESS: throw new IllegalStateException("Task completed with a result"); case FAILED: Object x = outcome; return (Throwable) x; case CANCELLED: throw new IllegalStateException("Task was cancelled"); default: throw new IllegalStateException("Task has not completed"); } }
最後是實現了 Runnable 的 run 方法:
public void run() { // 保證 NEW 狀態和 RUNNER 成功設定當前執行緒 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 待執行的 Callable if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 執行 Callable ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // 為了防止並行呼叫 run ,直到 state 確定之前, runner 必須是非空的 runner = null; // 狀態必須在 runner 置空後重新讀取,以防止洩漏中斷 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
這裡涉及兩個方法,第一個是 setException(ex)
:
// 導致該future報告一個{@link ExecutionException},並將給定的可丟擲物件作為其原因,除非該future已經被設定或取消。 protected void setException(Throwable t) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = t; STATE.setRelease(this, EXCEPTIONAL); // final state finishCompletion(); } }
另一個是 handlePossibleCancellationInterrupt 方法:
/** * 確保任何來自可能的 cancel(true) 的中斷只在 run 或 runAndReset 時交付給任務。 */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // 我們想清除可能從cancel(true)接收到的所有中斷。 // 然而,允許使用中斷作為任務與其呼叫者通訊的獨立機制,並沒有辦法只清除取消中斷。 // Thread.interrupted(); }
最後是 runAndReset 方法:
protected boolean runAndReset() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return false; boolean ran = false; // flag 表示正常執行結束 int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; // if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; // 當正常執行結束,且 state 一開始就是 NEW 時,表示可以執行並重置。 }
執行計算時不設定其結果,然後將該 future 重置為初始狀態,如果計算遇到異常或被取消,則不這樣做。這是為本質上執行多次的任務設計的。
run 和 runAndReset 都用到了一個 RUNNER
, 最後就來揭祕一下這個東西:
private static final VarHandle STATE; private static final VarHandle RUNNER; private static final VarHandle WAITERS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state", int.class); RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; }
MethodHandles.lookup()
建立一個MethodHandles.Lookup
物件,該物件可以建立所有存取許可權的方法,包括public
,protected
,private
,default
。
VarHandle
主要用於動態運算元組的元素或物件的成員變數。VarHandle
通過 MethodHandles
來獲取範例,然後呼叫 VarHandle
的方法即可動態操作指定陣列的元素或指定物件的成員變數。
到此這篇關於Java 多執行緒並行FutureTask的文章就介紹到這了,更多相關Java 多執行緒並行內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45