首頁 > 軟體

java並行使用CountDownLatch在生產環境翻車剖析

2022-08-10 14:04:15

前言

大家好,我是小郭,之前分享了CountDownLatch的使用,我們知道用來控制並行流程的同步工具,主要的作用是為了等待多個執行緒同時完成任務後,在進行主執行緒任務。

萬萬沒想到,在生產環境中竟然翻車了,因為沒有考慮到一些場景,導致了CountDownLatch出現了問題,接下來來分享一下由於CountDownLatch導致的問題。

【執行緒】並行流程控制的同步工具-CountDownLatch

需求背景

先簡單介紹下業務場景,針對使用者批次下載的檔案進行修改上傳

為了提高執行的速度,所以在採用執行緒池去執行 下載-修改-上傳 的操作,並在全部執行完之後統一提交儲存檔案地址到資料庫,於是加入了CountDownLatch來進行控制。

具體實現

根據服務本身情況,自定義一個執行緒池

public static ExecutorService testExtcutor() {
        return new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));
    }

模擬執行

public static void main(String[] args) {
        // 下載檔案總數
        List<Integer> resultList = new ArrayList<>(100);
        IntStream.range(0,100).forEach(resultList::add);
        // 下載檔案分段
        List<List<Integer>> split = CollUtil.split(resultList, 10);
        ExecutorService executorService = BaseThreadPoolExector.testExtcutor();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (List<Integer> list : split) {
            executorService.execute(() -> {
                list.forEach(i ->{
                    try {
                        // 模擬業務操作
                        Thread.sleep(500);
                        System.out.println("任務進入");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println(e.getMessage());
                    } finally {
                        System.out.println(countDownLatch.getCount());
                        countDownLatch.countDown();
                    }
                });
            });
        }
        try {
            countDownLatch.await();
            System.out.println("countDownLatch.await()");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

一開始我個人感覺沒有什麼問題,反正finally都能夠做減一的操作,到最後呼叫await方法,進行主執行緒任務

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
  at Thread.executor.executorTestBlock.main(executorTestBlock.java:28)
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown

由於任務數量較多,阻塞佇列中已經塞滿了,所以預設的拒絕策略,當佇列滿時,處理策略報錯異常,

要注意這個異常是執行緒池,自己丟擲的,不是我們迴圈裡面列印出來的,

這也造成了,線上這個執行緒池被阻塞了,他永遠也呼叫不到await方法,

利用jstack,我們就能夠看到有問題

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

解決方案

調大阻塞佇列,但是問題來了,到底多少阻塞佇列才是大呢,如果太大了會不由又造成記憶體溢位等其他的問題

在第一個的基礎上,我們修改了拒絕策略,當觸發拒絕策略的時候,用呼叫者所在的執行緒來執行任務

 public static ThreadPoolExecutor queueExecutor(BlockingQueue<Runnable> workQueue){
        return new ThreadPoolExecutor(
                size,
                size,
                0L,
                TimeUnit.SECONDS,
                workQueue,
                new ThreadPoolExecutor.CallerRunsPolicy());
    }

你可能又會想說,會不會任務數量太多,導致呼叫者所在的執行緒執行不過來,任務提交的效能急劇下降

那我們就應該自定義拒絕策略,將這下排隊的訊息記錄下來,採用補償機制的方式去執行

同時也要注意上面的那個異常是執行緒池丟擲來的,我們自己也需要將執行緒池進行try catch,記錄問題資料,並且在finally中執行countDownLatch.countDown來避免,執行緒池的使用

總結

目前根據業務部門的反饋,業務實際中任務數不很特別多的情況,所以暫時先採用了第二種方式去解決這個線上問題

在這裡我們也可以看到,如果沒有正確的關閉countDownLatch,可能會導致一直等待,這也是我們需要注意的。

工具雖然好,但是依然要注意他帶來的問題,沒有正確的去處理好,引發的一系列連鎖反應。

以上就是java並行使用CountDownLatch在生產環境翻車剖析的詳細內容,更多關於java並行CountDownLatch的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com