首頁 > 軟體

Java 協程 Quasar詳解

2022-07-07 10:01:08

前言

在程式語言的這個圈子裡,各種語言之間的對比似乎就一直就沒有停過,像什麼古早時期的"PHP是世界上最好的語言"就不提了,最近我在摸魚的時候,看到不少文章都在說"Golang效能吊打Java"。作為一個寫了好幾年java的javaer,這我怎麼能忍?於是在網上看了一些對比golang和java的文章,其中戳中java痛點、也是golang被吹上天的一條,就是對多執行緒並行的支援了。先看一段描述:

Go從語言層面原生支援並行,並且使用簡單,Go語言中的並行基於輕量級執行緒Goroutine,建立成本很低,單個Go應用也可以充分利用CPU多核,編寫高並行伺服器端軟體簡單,執行效能好,很多情況下完全不需要考慮鎖機制以及由此帶來的各種問題。

看到這,我的心瞬間涼了大半截,真的是字字扎心。雖然說java裡的JUC包已經幫我們封裝好了很多並行工具,但實際高並行的環境中我們還要考慮到各種鎖的使用,以及伺服器效能瓶頸、限流熔斷等非常多方面的問題。

再說回go,前面提到的這個goroutine究竟是什麼東西?其實,輕量級執行緒goroutine也可以被稱為協程,得益於go中的排程器以及GMP模型,go程式會智慧地將goroutine中的任務合理地分配給每個 CPU。

好了,其實上面說的這一大段我也不懂,都是向寫go的哥們兒請教來的,總之就是go的並行效能非常優秀就是了。不過這都不是我們要說的重點,今天我們要討論的是如何在Java中使用協程。

協程是什麼?

我們知道,執行緒在阻塞狀態和可執行狀態的切換,以及執行緒間的上下文切換都會造成效能的損耗。為了解決這些問題,引入協程coroutine這一概念,就像在一個程序中允許存在多個執行緒,在一個執行緒中,也可以存在多個協程。

那麼,使用協程究竟有什麼好處呢?

首先,執行效率高。執行緒的切換由作業系統核心執行,消耗資源較多。而協程由程式控制,在使用者態執行,不需要從使用者態切換到核心態,我們也可以理解為,協程是一種程序自身來排程任務的排程模式,因此協程間的切換開銷遠小於執行緒切換。

其次,節省資源。因為協程在本質上是通過分時複用了一個單執行緒,因此能夠節省一定的資源。

類似於執行緒的五種狀態切換,協程間也存在狀態的切換,下面這張圖展示了協程排程器內部任務的流轉。

綜合上面這些角度來看,和原生支援協程的go比起來,java在多執行緒並行上還真的是不堪一擊。但是,雖然在Java官方的jdk中不能直接使用協程,但是,有其他的開源框架藉助動態修改位元組碼的方式實現了協程,就比如我們接下來要學習的Quasar。

Quasar使用

Quasar是一個開源的Java協程框架,通過利用Java instrument技術對位元組碼進行修改,使方法掛起前後可以儲存和恢復jvm棧幀,方法內部已執行到的位元組碼位置也通過增加狀態機的方式記錄,在下次恢復執行可直接跳轉至最新位置。

Quasar專案最後更新時間為2018年,版本停留在0.8.0,但是我在直接使用這個版本時報了一個錯誤:

這個錯誤的大意就是這個class檔案是使用的高版本jdk編譯的,所以你在低版本的jdk上當然無法執行了。這裡major版本號54對應的是jdk10,而我使用的是jdk8,無奈降級試了一下低版本,果然0.7.10可以使用:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-core</artifactId>
    <version>0.7.10</version>
</dependency>

在我們做好準備工作後,下面就寫幾個例子來感受一下協程的魅力吧。

1、執行時間

下面我們模擬一個簡單的場景,假設我們有一個任務,平均執行時間為1秒,分別測試一下使用執行緒和協程並行執行10000次需要消耗多少時間。

先通過執行緒進行呼叫,直接使用Executors執行緒池:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch=new CountDownLatch(10000);
    long start = System.currentTimeMillis();
    ExecutorService executor= Executors.newCachedThreadPool();
    for (int i = 0; i < 10000; i++) {
        executor.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    long end = System.currentTimeMillis();
    System.out.println("Thread use:"+(end-start)+" ms");
}

檢視執行時間:

好了,下面我們再用Quasar中的協程跑一下和上面相同的流程。這裡我們要使用的是Quasar中的Fiber,它可以被翻譯為協程纖程,建立Fiber的型別主要可分為下面兩類:

public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableRunnable target);
public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableCallable<V> target);

Fiber中可以執行無返回值的SuspendableRunnable或有返回值的SuspendableCallable,看這個名字也知道區別就是java中的RunnableCallable的區別了。其餘引數都可以省略,name為協程的名稱,scheduler是排程器,預設使用FiberForkJoinSchedulerstackSize指定用於儲存fiber呼叫棧資訊的stack大小。

在下面的程式碼中,使用了Fiber.sleep()方法進行協程的休眠,和Thread.sleep()非常類似。

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch=new CountDownLatch(10000);
    long start = System.currentTimeMillis();

    for (int i = 0; i < 10000; i++) {
        new Fiber<>(new SuspendableRunnable(){
            @Override
            public Integer run() throws SuspendExecution, InterruptedException {
                Fiber.sleep(1000);
                countDownLatch.countDown();
            }
        }).start();
    }

    countDownLatch.await();
    long end = System.currentTimeMillis();
    System.out.println("Fiber use:"+(end-start)+" ms");
}

直接執行,報了一個警告:

QUASAR WARNING: Quasar Java Agent isn't running. If you're using another instrumentation method you can ignore this message; otherwise, please refer to the Getting Started section in the Quasar documentation.

還記得我們前面說過的Quasar生效的原理是基於Java instrument技術嗎,所以這裡需要給它新增一個代理Agent。找到本地maven倉庫中已經下好的jar包,在VM options中新增引數:

-javaagent:E:Apachemaven-repositorycoparalleluniversequasar-core.7.10quasar-core-0.7.10.jar

這次執行時就沒有提示警告了,檢視一下執行時間:

執行時間只有使用執行緒池時的一半多一點,確實能大大縮短程式的效率。

2、記憶體佔用

在測試完執行時間後,我們再來測試一下執行記憶體佔用的對比。通過下面程式碼嘗試在本地啟動100萬個執行緒:

public static void main(String[] args) {
    for (int i = 0; i < 1000000; i++) {
        new Thread(() -> {
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

本來以為會報OutOfMemoryError,但是沒想到的是我的電腦直接直接卡死了…而且不是一次,試了幾次都是以卡死只能重啟電腦而結束。好吧,我選擇放棄,那麼下面再試試啟動100萬個Fiber協程。

public static void main(String[] args) throws Exception {
    CountDownLatch countDownLatch=new CountDownLatch(10000);
    for (int i = 0; i < 1000000; i++) {
        int finalI = i;
        new Fiber<>((SuspendableCallable<Integer>)()->{
            Fiber.sleep(100000);
            countDownLatch.countDown();
            return finalI;
        }).start();
    }
    countDownLatch.await();
    System.out.println("end");
}

程式能夠正常執行結束,看樣子使用的記憶體真的比執行緒少很多。上面我故意使每個協程結束的時間拖得很長,這樣我們就可以在執行過程中使用Java VisualVM檢視記憶體的佔用情況了:

可以看到在使用Fiber的情況下只使用了1G多一點的記憶體,平均到100萬個協程上也就是說每個Fiber只佔用了1Kb左右的記憶體空間,和Thread執行緒比起來真的是非常的輕量級。

從上面這張圖中我們也可以看到,執行了非常多的ForkJoinPool,它們又起到了什麼作用呢?我們在前面說過,協程是由程式控制在使用者態進行切換,而Quasar中的排程器就使用了一個或多個ForkJoinPool來完成對Fiber的排程。

3、原理與應用

這裡簡單介紹一下Quasar的原理,在編譯時框架會對程式碼進行掃描,如果方法帶有@Suspendable註解,或丟擲了SuspendExecution,或在組態檔META-INF/suspendables中指定該方法,那麼Quasar就會修改生成的位元組碼,在park掛起方法的前後,插入一些位元組碼。

這些位元組碼會記錄此時協程的執行狀態,例如相關的區域性變數與運算元棧,然後通過丟擲異常的方式將cpu的控制權從當前協程交回到控制器,此時控制器可以再排程另外一個協程執行,並通過之前插入的那些位元組碼恢復當前協程的執行狀態,使程式能繼續正常執行。

回頭看一下前面例子中的SuspendableRunnableSuspendableCallable,它們的run方法上都丟擲了SuspendExecution,其實這並不是一個真正的異常,僅作為識別掛起方法的宣告,在實際執行中不會丟擲。當我們建立了一個Fiber,並在其中呼叫了其他方法時,如果想要Quasar的排程器能夠介入,那麼必須在使用時層層丟擲這個異常或新增註解。

看一下簡單的程式碼書寫的範例:

public void request(){
    new Fiber<>(new SuspendableRunnable() {
        @Override
        public void run() throws SuspendExecution, InterruptedException {
            String content = sendRequest();
            System.out.println(content);
        }
    }).start();
}
private String sendRequest() throws SuspendExecution {
    return realSendRequest();
}
private String realSendRequest() throws SuspendExecution{
    HttpResponse response = HttpRequest.get("http://127.0.0.1:6879/name").execute();
    String content = response.body();
    return content;
}

需要注意的是,如果在方法內部已經通過try/catch的方式捕獲了Exception,也應該再次手動丟擲這個SuspendExecution異常。

總結

本文介紹了Quasar框架的簡單使用,其具體的實現原理比較複雜,暫時就不在這裡進行討論,後面打算單獨拎出來進行分析。另外,目前已經有不少其他的框架中已經整合了Quasar,例如同樣是Parallel Universe下的Comsat專案,能夠提供了HTTP和DB存取等功能。

雖然現在想要在Java中使用協程還只能使用這樣的第三方的框架,但是也不必灰心,在OpenJDK 16中已經加入了一個名為Project Loom的專案, 在OpenJDK Wiki上可以看到對它的介紹,它將使用Fiber輕量級使用者模式執行緒,從jvm層面對多執行緒技術進行徹底的改變,使用新的程式設計模型,使輕量級執行緒的並行也能夠適用於高吞吐量的業務場景。

到此這篇關於Java 協程 Quasar詳解的文章就介紹到這了,更多相關Java Quasar內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com