首頁 > 軟體

Queue佇列中join()與task_done()的關係及說明

2023-02-27 06:01:04

join()與task_done()的關係

在網上大多關於join()與task_done()的結束原話是這樣的:

  • Queue.task_done() 在完成一項工作之後,Queue.task_done()函數向任務已經完成的佇列傳送一個訊號 
  • Queue.join() 實際上意味著等到佇列為空,再執行別的操作

但是可能很多人還是不太理解,這裡以我自己的理解來闡述這兩者的關聯。

理解

如果執行緒裡每從佇列裡取一次,但沒有執行task_done(),則join無法判斷佇列到底有沒有結束,在最後執行個join()是等不到結果的,會一直掛起。

可以理解為,每task_done一次 就從佇列裡刪掉一個元素,這樣在最後join的時候根據佇列長度是否為零來判斷佇列是否結束,從而執行主執行緒。

下面看個自己寫的例子:

下面這個例子,會在join()的地方無限掛起,因為join在等佇列清空,但是由於沒有task_done,它認為佇列還沒有清空,還在一直等。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''threading test'''
import threading
import queue
from time import sleep
#之所以為什麼要用執行緒,因為執行緒可以start後繼續執行後面的主執行緒,可以put資料,如果不是執行緒直接在get阻塞。
class Mythread(threading.Thread):
 def __init__(self,que):
 threading.Thread.__init__(self)
 self.queue = que
 def run(self):
 while True:
 sleep(1)
 if self.queue.empty(): #判斷放到get前面,這樣可以,否則佇列最後一個取完後就空了,直接break,走不到print
 break
 item = self.queue.get()
 print(item,'!')
 #self.queue.task_done()
 return
que = queue.Queue()
tasks = [Mythread(que) for x in range(1)]
for x in range(10):
 
 que.put(x) #快速生產
for x in tasks:
 t = Mythread(que) #把同一個佇列傳入2個執行緒
 t.start()
 
que.join()
 
print('---success---')

如果把self.queue.task_done()  註釋去掉,就會順利執行完主程式。

這就是“ Queue.task_done()函數向任務已經完成的佇列傳送一個訊號”這句話的意義,能夠讓join()函數能判斷出佇列還剩多少,是否清空了。

而事實上我們看下queue的原始碼可以看出確實是執行一次未完成佇列減一:

 def task_done(self):
 '''Indicate that a formerly enqueued task is complete.
 Used by Queue consumer threads. For each get() used to fetch a task,
 a subsequent call to task_done() tells the queue that the processing
 on the task is complete.
 If a join() is currently blocking, it will resume when all items
 have been processed (meaning that a task_done() call was received
 for every item that had been put() into the queue).
 Raises a ValueError if called more times than there were items
 placed in the queue.
 '''
 with self.all_tasks_done:
 unfinished = self.unfinished_tasks - 1
 if unfinished <= 0:
 if unfinished < 0:
 raise ValueError('task_done() called too many times')
 self.all_tasks_done.notify_all()
 self.unfinished_tasks = unfinished
 

快速生產-快速消費

上面的演示程式碼是快速生產-慢速消費的場景,我們可以直接用task_done()與join()配合,來讓empty()判斷出佇列是否已經結束。

當然,queue我們可以正確判斷是否已經清空,但是執行緒裡的get佇列是不知道,如果沒有東西告訴它,佇列空了,因此get還會繼續阻塞,那麼我們就需要在get程式中加一個判斷,如果empty()成立,break退出迴圈,否則get()還是會一直阻塞。

慢速生產-快速消費

但是如果生產者速度與消費者速度相當,或者生產速度小於消費速度,則靠task_done()來實現佇列減一則不靠譜,佇列會時常處於供不應求的狀態,常為empty,所以用empty來判斷則不靠譜。

那麼這種情況會導致 join可以判斷出佇列結束了,但是執行緒裡不能依靠empty()來判斷執行緒是否可以結束。

我們可以在消費佇列的每個執行緒最後塞入一個特定的“標記”,在消費的時候判斷,如果get到了這麼一個“標記”,則可以判定佇列結束了,因為生產佇列都結束了,也不會再新增了。

程式碼如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''threading test'''
import threading
import queue
from time import sleep
#之所以為什麼要用執行緒,因為執行緒可以start後繼續執行後面的主執行緒,可以put資料,如果不是執行緒直接在get阻塞。
class Mythread(threading.Thread):
 def __init__(self,que):
 threading.Thread.__init__(self)
 self.queue = que
 def run(self):
 while True:
 item = self.queue.get()
 self.queue.task_done() #這裡要放到判斷前,否則取最後最後一個的時候已經為空,直接break,task_done執行不了,join()判斷佇列一直沒結束
 if item == None:
 break
 print(item,'!')
return
que = queue.Queue()
tasks = [Mythread(que) for x in range(1)]
 #快速生產
for x in tasks:
 t = Mythread(que) #把同一個佇列傳入2個執行緒
 t.start()
for x in range(10):
 sleep(1)
 que.put(x)
for x in tasks:
 que.put(None)
que.join()
print('---success---')

注意點

put佇列完成的時候千萬不能用task_done(),否則會報錯:

task_done() called too many times

因為該方法僅僅表示get成功後,執行的一個標記。

總結

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com