首頁 > 軟體

python中celery的基本使用詳情

2022-10-02 14:00:46

1.基本介紹

Celery 是由Python 編寫的簡單,靈活,可靠的用來處理大量資訊的分散式系統,它同時提供操作和維護分散式系統所需的工具。Celery 專注於實時任務處理,支援任務排程。

簡單的說,它就是一個分散式佇列的管理工具,用celery提供的介面快速實現並管理一個分散式的任務佇列。

有一點我們需要搞清楚,Celery 本身並不是任務佇列,它是一個分散式佇列的管理工具,Celery封裝好了操作常見任務佇列的各種操作,比如說從監聽某個任務佇列並從該佇列中拿到資料進行消費。

2.使用場景

它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。

  • 非同步任務: 將耗時操作任務提交給Celery去非同步執行,比如傳送簡訊/郵件、訊息推播、音視訊處理等等
  • 定時任務: 定時執行某件事情,比如每天資料統計

3.工作流程和組成部分

這裡用一張圖片說明下:

Celery的架構由三部分組成,訊息中介軟體(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)組成。

訊息中介軟體:

Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括RabbitMQ, Redis等等,官方推薦用rabbitMQ,因為它持久穩定。

任務執行單元:

WorkerCelery提供的任務執行的單元,worker並行的執行在分散式的系統節點中。

任務結果儲存:

Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis

另外, Celery還支援不同的並行和序列化的手段。

並行:Prefork, Eventlet, gevent, threads/single threaded

序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模組

pip install celery
pip install redis

4.Celery執行非同步任務

4.1 基礎使用

這裡專案結構如下:

第一步:先建立celery相關設定設定celery_object.py

import celery

# 執行如下命令: celery -A celery_object worker -l info

backend = "redis://127.0.0.1:6379/4"  # 設定redis的4號資料庫來存放結果
broker = "redis://127.0.0.1:6379/5"  # 設定redis的5號資料庫存放訊息中介軟體
celery_app = celery.Celery(
    "celery_demo",
    backend=backend,
    broker=broker,
    include=[
        "celery_task",
    ],
)

celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]

celery_app.conf.timezone = "Asia/Shanghai"  # 時區
celery_app.conf.enable_utc = False  # 是否使用UTC

引數說明:

  • backend 就是非同步任務執行完成以後,結果存放的地方。
  • broker 就是具體執行任務的工作節點。
  • celery.Celery()方法是範例化一個celery物件。

第二步:建立任務相關的檔案celery_task.py

import time

from celery_object import celery_app

@celery_app.task
def send_email(name):
    print("向%s傳送郵件..." % name)
    time.sleep(5)
    print("向%s傳送郵件完成" % name)
    return f"成功拿到{name}傳送的郵件!"

@celery_app.task
def send_msg(name):
    print("向%s傳送簡訊..." % name)
    time.sleep(5)
    print("向%s傳送簡訊完成" % name)
    return f"成功拿到{name}傳送的簡訊!"

通過@celery_app.task這樣的裝飾器,成功的把對應的函數變成對應celery的非同步worker函數。

緊接著我們在專案當前所在的目錄執行命令:

celery -A celery_object worker -l info
  • -A 指的是application應用物件
  • worker 就是工作人(固定寫法)
  • -l 指的是紀錄檔級別,這裡是列印info級別的紀錄檔

之後就可以有下面的輸出顯示就代表celery動成功:

之後我們就可以向celery生產任務了,建立produce_result.py檔案。

from celery_task import send_email, send_msg

if __name__ == "__main__":
    for i in range(10):
        result = send_email.delay(f"張三{i}")
        print(result.id)
        result2 = send_msg.delay(f"李四{i}")
        print(result2.id)

執行生產任務的程式,會看到如下的資料,這裡列印的就是任務ID。

然後在終端可以看到下面的東西,就代表celery成功的拿到佇列中任務 並進行消費了。

然後開啟我們的redis可以看到有對應的資料記錄。

與此同時 我們還可以檢視celery任務ID的狀態check_result.py寫入如下:

from celery.result import AsyncResult
from celery_object import celery_app

async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app)
print("任務狀態:", async_result.status)
if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 將結果刪除
elif async_result.failed():
    print("執行失敗")
elif async_result.status == "PENDING":
    print("任務等待中被執行")
elif async_result.status == "RETRY":
    print("任務異常後正在重試")
elif async_result.status == "STARTED":
    print("任務已經開始被執行")

執行結果:

任務狀態: SUCCESS
成功拿到李四0傳送的簡訊!

到此這篇關於python中celery的基本使用詳情的文章就介紹到這了,更多相關python celery內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com