首頁 > 軟體

Python實現執行緒池之執行緒安全佇列

2022-05-25 22:02:00

本文範例為大家分享了Python實現執行緒池之執行緒安全佇列的具體程式碼,供大家參考,具體內容如下

一、執行緒池組成

一個完整的執行緒池由下面幾部分組成,執行緒安全佇列、任務物件、執行緒處理物件、執行緒池物件。其中一個執行緒安全的佇列是實現執行緒池和任務佇列的基礎,本節我們通過threading包中的互斥量threading.Lock()和條件變數threading.Condition()來實現一個簡單的、讀取安全的執行緒佇列。

二、執行緒安全佇列的實現

包括put、pop、get等方法,為保證執行緒安全,讀寫操作時要新增互斥鎖;並且pop操作可以設定等待時間以阻塞當前獲取元素的執行緒,當新元素寫入佇列時通過條件變數通知解除等待操作。

class ThreadSafeQueue(object):

    def __init__(self, max_size=0):
        self.queue = []
        self.max_size = max_size  # max_size為0表示無限大
        self.lock = threading.Lock()  # 互斥量
        self.condition = threading.Condition()  # 條件變數

    def size(self):
        """
        獲取當前佇列的大小
        :return: 佇列長度
        """
        # 加鎖
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size

    def put(self, item):
        """
        將單個元素放入佇列
        :param item:
        :return:
        """
        # 佇列已滿 max_size為0表示無限大
        if self.max_size != 0 and self.size() >= self.max_size:
            return ThreadSafeException()

        # 加鎖
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        # 通知等待讀取的執行緒
        self.condition.notify()
        self.condition.release()

        return item

    def batch_put(self, item_list):
        """
        批次新增元素
        :param item_list:
        :return:
        """
        if not isinstance(item_list, list):
            item_list = list(item_list)

        res = [self.put(item) for item in item_list]

        return res

    def pop(self, block=False, timeout=0):
        """
        從佇列頭部取出元素
        :param block: 是否阻塞執行緒
        :param timeout: 等待時間
        :return:
        """
        if self.size() == 0:
            if block:
                self.condition.acquire()
                self.condition.wait(timeout)
                self.condition.release()
            else:
                return None

        # 加鎖
        self.lock.acquire()
        item = None
        if len(self.queue):
            item = self.queue.pop()
        self.lock.release()

        return item

    def get(self, index):
        """
        獲取指定位置的元素
        :param index:
        :return:
        """
        if self.size() == 0 or index >= self.size():
            return None

        # 加鎖
        self.lock.acquire()
        item = self.queue[index]
        self.lock.release()

        return item


class ThreadSafeException(Exception):
    pass

三、測試邏輯

3.1、測試阻塞邏輯

def thread_queue_test_1():
    thread_queue = ThreadSafeQueue(10)

    def producer():
        while True:
            thread_queue.put(random.randint(0, 10))
            time.sleep(2)

    def consumer():
        while True:
            print('current time before pop is %d' % time.time())
            item = thread_queue.pop(block=True, timeout=3)
            # item = thread_queue.get(2)
            if item is not None:
                print('get value from queue is %s' % item)
            else:
                print(item)
            print('current time after pop is %d' % time.time())

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

測試結果:

我們可以看到生產者執行緒每隔2s向佇列寫入一個元素,消費者執行緒當無資料時預設阻塞3s。通過執行時間發現消費者執行緒確實發生了阻塞,當生產者寫入資料時結束當前等待操作。

3.2、測試讀寫加鎖邏輯

def thread_queue_test_2():
    thread_queue = ThreadSafeQueue(10)

    def producer():
        while True:
            thread_queue.put(random.randint(0, 10))
            time.sleep(2)

    def consumer(name):
        while True:
            item = thread_queue.pop(block=True, timeout=1)
            # item = thread_queue.get(2)
            if item is not None:
                print('%s get value from queue is %s' % (name, item))
            else:
                print('%s get value from queue is None' % name)

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer, args=('thread1',))
    t3 = threading.Thread(target=consumer, args=('thread2',))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

測試結果:

生產者還是每2s生成一個元素寫入佇列,消費者開啟兩個執行緒進行消費,預設阻塞時間為1s,列印結果顯示通過加鎖確保每次只有一個執行緒能獲取資料,保證了執行緒讀寫的安全。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com