<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
業務互動的過程中涉及到了很多關於SFTP下載的問題,因此在程式碼中定義了一些執行緒池,使用中發現了一些問題,
程式碼類似如下所示:
public class ExecutorTest { private static ExecutorService es = new ThreadPoolExecutor(2, 100, 1000, TimeUnit.MILLISECONDS , new ArrayBlockingQueue<>(10)); public static void main(String[] args) { for (int i = 0; i < 10; i++) { es.submit(new MyThread()); } } static class MyThread implements Runnable { @Override public void run() { for (; ; ) { System.out.println("Thread name=" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
如上面的程式碼所示,定義了一個初始容量為2,最大容量為100,佇列長度為10的執行緒池,期待的執行結果為:
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-3
Thread name=pool-1-thread-4
Thread name=pool-1-thread-5
Thread name=pool-1-thread-6
Thread name=pool-1-thread-7
Thread name=pool-1-thread-8
Thread name=pool-1-thread-9
Thread name=pool-1-thread-10
Thread name=pool-1-thread-3
Thread name=pool-1-thread-5
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-4
Thread name=pool-1-thread-10
Thread name=pool-1-thread-7
Thread name=pool-1-thread-6
Thread name=pool-1-thread-9
Thread name=pool-1-thread-8
Thread name=pool-1-thread-3
Thread name=pool-1-thread-4
Thread name=pool-1-thread-1
Thread name=pool-1-thread-5
Thread name=pool-1-thread-2
Thread name=pool-1-thread-8
Thread name=pool-1-thread-6
Thread name=pool-1-thread-7
Thread name=pool-1-thread-9
Thread name=pool-1-thread-10
期待十個執行緒都可以執行,但實際的執行效果如下:
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
對比可以看出,用上面的方式定義執行緒池,最終只有兩個執行緒可以執行,即執行緒池的初始容量大小。其餘執行緒都被阻塞到了佇列ArrayBlockingQueue<>(10)
我們知道,Executors框架提供了幾種常見的執行緒池分別為:
如果將程式碼中自定義的執行緒池改為 :
private static ExecutorService es = Executors.newCachedThreadPool();
執行發現,提交的十個執行緒都可以執行
Executors.newCachedThreadPool()的原始碼如下:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
通過對比發現,newCachedThreadPool使用的是 SynchronousQueue<>()而我們使用的是ArrayBlockingQueue<>(10) 因此可以很容易的發現問題出在佇列上。
將ArrayBlockingQueue改為SynchronousQueue 問題解決,程式碼如下:
public class ExecutorTest { private static ExecutorService es = new ThreadPoolExecutor(2, 100, 1000, TimeUnit.MILLISECONDS , new SynchronousQueue<>()); private static ExecutorService es2 = Executors.newCachedThreadPool(); public static void main(String[] args) { for (int i = 0; i < 10; i++) { es.submit(new MyThread()); } } static class MyThread implements Runnable { @Override public void run() { for (; ; ) { System.out.println("Thread name=" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
從圖上我們可以看到,兩個佇列都繼承了AbstractQueue實現了BlockingQueue介面,因此功能應該相似
* <p>Synchronous queues are similar to rendezvous channels used in * CSP and Ada. They are well suited for handoff designs, in which an * object running in one thread must sync up with an object running * in another thread in order to hand it some information, event, or * task.
SynchronousQueue類似於一個傳遞通道,只是通過他傳遞某個元素,並沒有任何容量,只有當第一個元素被取走,才能在給佇列新增元素。
* A bounded {@linkplain BlockingQueue blocking queue} backed by an * array. This queue orders elements FIFO (first-in-first-out). The * <em>head</em> of the queue is that element that has been on the * queue the longest time. The <em>tail</em> of the queue is that * element that has been on the queue the shortest time. New elements * are inserted at the tail of the queue, and the queue retrieval * operations obtain elements at the head of the queue.
ArrayBlockingQueue從定義來看就是一個普通的佇列,先入先出,當佇列為空時,獲取資料的執行緒會被阻塞,當佇列滿時,新增佇列的執行緒會被阻塞,直到佇列可用。
從上面佇列的定義中可以看出,導致執行緒池沒有按照預期執行的原因不是因為佇列的問題,應該是關於執行緒池在提交任務時,從佇列取資料的方式不同導致的。
* <dt>Queuing</dt> * * <dd>Any {@link BlockingQueue} may be used to transfer and hold * submitted tasks. The use of this queue interacts with pool sizing: * * <ul> * * <li> If fewer than corePoolSize threads are running, the Executor * always prefers adding a new thread * rather than queuing.</li> * * <li> If corePoolSize or more threads are running, the Executor * always prefers queuing a request rather than adding a new * thread.</li> * * <li> If a request cannot be queued, a new thread is created unless * this would exceed maximumPoolSize, in which case, the task will be * rejected.</li>
從說明中可以看到,如果正在執行的執行緒數必初始容量corePoolSize小,那麼Executor會從建立一個新執行緒去執行任務,如果正在執行的執行緒數必corePoolSize大,那麼Executor會將新提交的任務放到阻塞佇列,除非當佇列的個數超過了佇列的最大長度maxmiumPooSize。
從原始碼中找到關於提交任務的方法:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
從原始碼中看到 subimit實際上是呼叫了execute方法
execute方法的原始碼:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
原始碼中可以看出,提交任務時,首先會判斷正在執行的執行緒數是否小於corePoolSize,如果條件成立那麼會直接建立執行緒並執行任務。如果條件不成立,且佇列沒有滿,那麼將任務放到佇列,如果條件不成立但是佇列滿了,那麼同樣也新建立執行緒並執行任務。
到此這篇關於Java如何自定義執行緒池中佇列的文章就介紹到這了,更多相關Java 佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45