首頁 > 軟體

Django中celery的使用專案範例

2022-07-06 18:02:14

1、django應用Celery

django框架請求/響應的過程是同步的,框架本身無法實現非同步響應。

但是我們在專案過程中會經常會遇到一些耗時的任務, 比如:傳送郵件、傳送簡訊、巨量資料統計等等,這些操作耗時長,同步執行對使用者體驗非常不友好,那麼在這種情況下就需要實現非同步執行。

非同步執行前端一般使用ajax,後端使用Celery。

2 、專案應用

django專案應用celery,主要有兩種任務方式,一是非同步任務(釋出者任務),一般是web請求,二是定時任務。

celery組成

Celery是由Python開發、簡單、靈活、可靠的分散式任務佇列,是一個處理非同步任務的框架,其本質是生產者消費者模型,生產者傳送任務到訊息佇列,消費者負責處理任務。Celery側重於實時操作,但對排程支援也很好,其每天可以處理數以百萬計的任務。特點:

簡單:熟悉celery的工作流程後,設定使用簡單

高可用:當任務執行失敗或執行過程中發生連線中斷,celery會自動嘗試重新執行任務

快速:一個單程序的celery每分鐘可處理上百萬個任務

靈活:幾乎celery的各個元件都可以被擴充套件及自客製化

Celery由三部分構成:

訊息中介軟體(Broker):官方提供了很多備選方案,支援RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ

任務執行單元(Worker):任務執行單元,負責從訊息佇列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實現分散式的核心

結果儲存(Backend):官方提供了諸多的儲存方式支援:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等

架構如下:

工作原理:

任務模組Task包含非同步任務和定時任務。其中,非同步任務通常在業務邏輯中被觸發並行往訊息佇列,而定時任務由Celery Beat程序週期性地將任務發往訊息佇列;

任務執行單元Worker實時監視訊息佇列獲取佇列中的任務執行;

Woker執行完任務後將結果儲存在Backend中;

本文使用的是redis資料庫作為訊息中介軟體和結果儲存資料庫

1.非同步任務redis

1.安裝庫

pip install celery
pip install redis

2.celery.py

在主專案目錄下,新建 celery.py 檔案:

import os
import django
from celery import Celery
from django.conf import settings
 
# 設定系統環境變數,安裝django,必須設定,否則在啟動celery時會報錯
# celery_study 是當前專案名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')
django.setup()
 
celery_app = Celery('celery_study')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

注意:是和settings.py檔案同目錄,一定不能建立在專案根目錄,不然會引起 celery 這個模組名的命名衝突  

同時,在主專案的init.py中,新增如下程式碼:

from .celery import celery_app
 
__all__ = ['celery_app']

3.settings.py

在組態檔中設定對應的redis設定:

# Broker設定,使用Redis作為訊息中介軟體
BROKER_URL = 'redis://127.0.0.1:6379/0' 
 
# BACKEND設定,這裡使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' 
 
# 結果序列化方案
CELERY_RESULT_SERIALIZER = 'json' 
 
# 任務結果過期時間,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 
 
# 時區設定
CELERY_TIMEZONE='Asia/Shanghai'   
 
# 指定匯入的任務模組,可以指定多個
#CELERY_IMPORTS = (     
#    'other_dir.tasks',
#)

注意:所有設定的官方檔案Configuration and defaults — Celery 5.2.0b3 documentation  

4.tasks.py

在子應用下建立各自對應的任務檔案tasks.py(必須是tasks.py這個名字,不允許修改)

from celery import shared_task
 
@shared_task
def add(x, y):
    return x + y
 
@shared_task
def mul(x, y):
    return x * y
 
@shared_task
def xsum(numbers):
    return sum(numbers)

5.呼叫任務

from .tasks import *
# Create your views here.
 
def task_add_view(request):
    add.delay(100,200)
    return HttpResponse(f'呼叫函數結果')

6.啟動celery

pip install eventlet
celery  -A celery_study worker  -l debug -P eventlet

注意 :celery_study是專案名

使用redis時,有可能會出現如下類似的異常

AttributeError: 'str' object has no attribute 'items'

這是由於版本差異,需要解除安裝已經安裝的python環境中的 redis 庫,重新指定安裝特定版本(celery4.x以下適用 redis2.10.6, celery4.3以上使用redis3.2.0以上):

xxxxxxxxxx pip install redis==2.10.6

7.獲取任務結果

在 views.py 中,通過 AsyncResult.get() 獲取結果

from celery import result
def get_result_by_taskid(request):
    task_id = request.GET.get('task_id')
	# 非同步執行
    ar = result.AsyncResult(task_id)
 
    if ar.ready():
        return JsonResponse({'status': ar.state, 'result': ar.get()})
    else:
        return JsonResponse({'status': ar.state, 'result': ''})

AsyncResult類的常用的屬性和方法:

  • state: 返回任務狀態,等同status;
  • task_id: 返回任務id;
  • result: 返回任務結果,同get()方法;
  • ready(): 判斷任務是否執行以及有結果,有結果為True,否則False;
  • info(): 獲取任務資訊,預設為結果;
  • wait(t): 等待t秒後獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則wait期間一直阻塞,直到超時報錯;
  • successful(): 判斷任務是否成功,成功為True,否則為False;

2.定時任務

在第一步的非同步任務的基礎上,進行部分修改即可

1.settings.py

from celery.schedules import crontab
 
CELERYBEAT_SCHEDULE = {
    'mul_every_30_seconds': {
         # 任務路徑
        'task': 'celery_app.tasks.mul',
         # 每30秒執行一次
        'schedule': 5,
        'args': (14, 5)
    }
}

說明(更多內容見檔案:Periodic Tasks — Celery 5.2.0b3 documentation):

  • task:任務函數
  • schedule:執行頻率,可以是整型(秒數),也可以是timedelta物件,也可以是crontab物件,也可以是自定義類(繼承celery.schedules.schedule)
  • args:位置引數,列表或元組
  • kwargs:關鍵字引數,字典
  • options:可選引數,字典,任何 apply_async() 支援的引數
  • relative:預設是False,取相對於beat的開始時間;設定為True,則取設定的timedelta時間

在task.py中設定了紀錄檔

from celery import shared_task
import logging  
logger = logging.getLogger(__name__))
 
 
@shared_task
def mul(x, y):
    logger.info('___mul__'*10)
    return x * y

2.啟動celery

(兩個cmd)分別啟動worker和beat

celery -A worker celery_study -l debug -P eventlet
celery beat -A celery_study -l debug

3.任務繫結

Celery可通過task繫結到範例獲取到task的上下文,這樣我們可以在task執行時候獲取到task的狀態,記錄相關紀錄檔等

方法:

  • 在裝飾器中加入引數 bind=True
  • 在task函數中的第一個引數設定為self

在task.py 裡面寫

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任務繫結
@shared_task(bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

其中:self物件是celery.app.task.Task的範例,可以用於實現重試等多種功能

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任務繫結
@shared_task(bind=True)
def add(self,x, y):
    try:
        logger.info('add__-----'*10)
        logger.info('name:',self.name)
        logger.info('dir(self)',dir(self))
        raise Exception
    except Exception as e:
        # 出錯每4秒嘗試一次,總共嘗試4次
        self.retry(exc=e, countdown=4, max_retries=4)    
    return x + y

啟動celery

celery -A worker celery_study -l debug -P eventlet

4.任務勾點

Celery在執行任務時,提供了勾點方法用於在任務執行完成時候進行對應的操作,在Task原始碼中提供了很多狀態勾點函數如:on_success(成功後執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行)

方法:通過繼承Task類,重寫對應方法即可,

from celery import Task
 
class MyHookTask(Task):
 
    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'task id:{task_id} , arg:{args} , successful !')
 
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
 
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , retry !  erros: {exc}')
 
# 在對應的task函數的裝飾器中,通過 base=MyHookTask 指定
@shared_task(base=MyHookTask, bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

啟動celery

celery -A worker celery_study -l debug -P eventlet

5.任務編排

在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery也能實現這樣的任務,完成這型別的任務通過以下模組完成:

  • group: 並行排程任務
  • chain: 鏈式任務排程
  • chord: 類似group,但分header和body2個部分,header可以是一個group任務,執行完成後呼叫body的任務
  • map: 對映排程,通過輸入多個入參來多次排程同一個任務
  • starmap: 類似map,入參類似*args
  • chunks: 將任務按照一定數量進行分組

檔案:Next Steps — Celery 5.2.0b3 documentation

1.group

urls.py:

path('primitive/', views.test_primitive),

views.py:

from .tasks import *
from celery import group
 
def test_primitive(request):
    # 建立10個並列的任務
    lazy_group = group(add.s(i, i) for i in range(10))
    promise = lazy_group()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

說明:

通過task函數的 s 方法傳入引數,啟動任務

上面這種方法需要進行等待,如果依然想實現非同步的方式,那麼就必須在tasks.py中新建一個task方法,呼叫group,範例如下:

tasks.py:

@shared_task
def group_task(num):
    return group(add.s(i, i) for i in range(num))().get()

urls.py:

path('first_group/', views.first_group),

views.py:

def first_group(request):
    ar = tasks.group_task.delay(10)
 
    return HttpResponse('返回first_group任務,task_id:' + ar.task_id)

2.chain

預設上一個任務的結果作為下一個任務的第一個引數

def test_primitive(request):
    # 等同呼叫  mul(add(add(2, 2), 5), 8)
    promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))()
    #  72
    result = promise.get()  
    return JsonResponse({'function': 'test_primitive', 'result': result})

3.chord

任務分割,分為header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果作為引數傳遞給body

def test_primitive(request):
    # header:  [3, 12] 
    # body: xsum([3, 12])
    promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

6、celery管理和監控

celery通過flower元件實現管理和監控功能 ,flower元件不僅僅提供監控功能,還提供HTTP API可實現對woker和task的管理

官網:flower · PyPI

檔案:Flower - Celery monitoring tool — Flower 1.0.1 documentation

安裝flower

pip install flower

啟動flower

flower -A celery_study --port=5555   

說明:

  • -A:專案名
  • --port: 埠號

存取

在瀏覽器輸入:http://127.0.0.1:5555

通過api操作

curl http://127.0.0.1:5555/api/workers

總結

到此這篇關於Django中celery使用專案的文章就介紹到這了,更多相關Django中celery使用內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com