<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
django框架請求/響應的過程是同步的,框架本身無法實現非同步響應。
但是我們在專案過程中會經常會遇到一些耗時的任務, 比如:傳送郵件、傳送簡訊、巨量資料統計等等,這些操作耗時長,同步執行對使用者體驗非常不友好,那麼在這種情況下就需要實現非同步執行。
非同步執行前端一般使用ajax,後端使用Celery。
django專案應用celery,主要有兩種任務方式,一是非同步任務(釋出者任務),一般是web請求,二是定時任務。
celery組成
Celery是由Python開發、簡單、靈活、可靠的分散式任務佇列,是一個處理非同步任務的框架,其本質是生產者消費者模型,生產者傳送任務到訊息佇列,消費者負責處理任務。Celery側重於實時操作,但對排程支援也很好,其每天可以處理數以百萬計的任務。特點:
簡單:熟悉celery的工作流程後,設定使用簡單
高可用:當任務執行失敗或執行過程中發生連線中斷,celery會自動嘗試重新執行任務
快速:一個單程序的celery每分鐘可處理上百萬個任務
靈活:幾乎celery的各個元件都可以被擴充套件及自客製化
Celery由三部分構成:
訊息中介軟體(Broker):官方提供了很多備選方案,支援RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ
任務執行單元(Worker):任務執行單元,負責從訊息佇列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實現分散式的核心
結果儲存(Backend):官方提供了諸多的儲存方式支援:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等
架構如下:
工作原理:
任務模組Task包含非同步任務和定時任務。其中,非同步任務通常在業務邏輯中被觸發並行往訊息佇列,而定時任務由Celery Beat程序週期性地將任務發往訊息佇列;
任務執行單元Worker實時監視訊息佇列獲取佇列中的任務執行;
Woker執行完任務後將結果儲存在Backend中;
本文使用的是redis資料庫作為訊息中介軟體和結果儲存資料庫
1.安裝庫
pip install celery pip install redis
2.celery.py
在主專案目錄下,新建 celery.py 檔案:
import os import django from celery import Celery from django.conf import settings # 設定系統環境變數,安裝django,必須設定,否則在啟動celery時會報錯 # celery_study 是當前專案名 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings') django.setup() celery_app = Celery('celery_study') celery_app.config_from_object('django.conf:settings') celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
注意:是和settings.py檔案同目錄,一定不能建立在專案根目錄,不然會引起 celery 這個模組名的命名衝突
同時,在主專案的init.py中,新增如下程式碼:
from .celery import celery_app __all__ = ['celery_app']
3.settings.py
在組態檔中設定對應的redis設定:
# Broker設定,使用Redis作為訊息中介軟體 BROKER_URL = 'redis://127.0.0.1:6379/0' # BACKEND設定,這裡使用redis CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 結果序列化方案 CELERY_RESULT_SERIALIZER = 'json' # 任務結果過期時間,秒 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 時區設定 CELERY_TIMEZONE='Asia/Shanghai' # 指定匯入的任務模組,可以指定多個 #CELERY_IMPORTS = ( # 'other_dir.tasks', #)
注意:所有設定的官方檔案:Configuration and defaults — Celery 5.2.0b3 documentation
4.tasks.py
在子應用下建立各自對應的任務檔案tasks.py(必須是tasks.py這個名字,不允許修改)
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
5.呼叫任務
from .tasks import * # Create your views here. def task_add_view(request): add.delay(100,200) return HttpResponse(f'呼叫函數結果')
6.啟動celery
pip install eventlet
celery -A celery_study worker -l debug -P eventlet
注意 :celery_study是專案名
使用redis時,有可能會出現如下類似的異常
AttributeError: 'str' object has no attribute 'items'
這是由於版本差異,需要解除安裝已經安裝的python環境中的 redis 庫,重新指定安裝特定版本(celery4.x以下適用 redis2.10.6, celery4.3以上使用redis3.2.0以上):
xxxxxxxxxx pip install redis==2.10.6
7.獲取任務結果
在 views.py 中,通過 AsyncResult.get() 獲取結果
from celery import result def get_result_by_taskid(request): task_id = request.GET.get('task_id') # 非同步執行 ar = result.AsyncResult(task_id) if ar.ready(): return JsonResponse({'status': ar.state, 'result': ar.get()}) else: return JsonResponse({'status': ar.state, 'result': ''})
AsyncResult類的常用的屬性和方法:
在第一步的非同步任務的基礎上,進行部分修改即可
1.settings.py
from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'mul_every_30_seconds': { # 任務路徑 'task': 'celery_app.tasks.mul', # 每30秒執行一次 'schedule': 5, 'args': (14, 5) } }
說明(更多內容見檔案:Periodic Tasks — Celery 5.2.0b3 documentation):
在task.py中設定了紀錄檔
from celery import shared_task import logging logger = logging.getLogger(__name__)) @shared_task def mul(x, y): logger.info('___mul__'*10) return x * y
2.啟動celery
(兩個cmd)分別啟動worker和beat
celery -A worker celery_study -l debug -P eventlet celery beat -A celery_study -l debug
Celery可通過task繫結到範例獲取到task的上下文,這樣我們可以在task執行時候獲取到task的狀態,記錄相關紀錄檔等
方法:
在task.py 裡面寫
from celery import shared_task import logging logger = logging.getLogger(__name__) # 任務繫結 @shared_task(bind=True) def add(self,x, y): logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) return x + y
其中:self物件是celery.app.task.Task的範例,可以用於實現重試等多種功能
from celery import shared_task import logging logger = logging.getLogger(__name__) # 任務繫結 @shared_task(bind=True) def add(self,x, y): try: logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) raise Exception except Exception as e: # 出錯每4秒嘗試一次,總共嘗試4次 self.retry(exc=e, countdown=4, max_retries=4) return x + y
啟動celery
celery -A worker celery_study -l debug -P eventlet
Celery在執行任務時,提供了勾點方法用於在任務執行完成時候進行對應的操作,在Task原始碼中提供了很多狀態勾點函數如:on_success(成功後執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行)
方法:通過繼承Task類,重寫對應方法即可,
from celery import Task class MyHookTask(Task): def on_success(self, retval, task_id, args, kwargs): logger.info(f'task id:{task_id} , arg:{args} , successful !') def on_failure(self, exc, task_id, args, kwargs, einfo): logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}') def on_retry(self, exc, task_id, args, kwargs, einfo): logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}') # 在對應的task函數的裝飾器中,通過 base=MyHookTask 指定 @shared_task(base=MyHookTask, bind=True) def add(self,x, y): logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) return x + y
啟動celery
celery -A worker celery_study -l debug -P eventlet
在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery也能實現這樣的任務,完成這型別的任務通過以下模組完成:
檔案:Next Steps — Celery 5.2.0b3 documentation
1.group
urls.py:
path('primitive/', views.test_primitive),
views.py:
from .tasks import * from celery import group def test_primitive(request): # 建立10個並列的任務 lazy_group = group(add.s(i, i) for i in range(10)) promise = lazy_group() result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
說明:
通過task函數的 s 方法傳入引數,啟動任務
上面這種方法需要進行等待,如果依然想實現非同步的方式,那麼就必須在tasks.py中新建一個task方法,呼叫group,範例如下:
tasks.py:
@shared_task def group_task(num): return group(add.s(i, i) for i in range(num))().get()
urls.py:
path('first_group/', views.first_group),
views.py:
def first_group(request): ar = tasks.group_task.delay(10) return HttpResponse('返回first_group任務,task_id:' + ar.task_id)
2.chain
預設上一個任務的結果作為下一個任務的第一個引數
def test_primitive(request): # 等同呼叫 mul(add(add(2, 2), 5), 8) promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))() # 72 result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
3.chord
任務分割,分為header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果作為引數傳遞給body
def test_primitive(request): # header: [3, 12] # body: xsum([3, 12]) promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())() result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
celery通過flower元件實現管理和監控功能 ,flower元件不僅僅提供監控功能,還提供HTTP API可實現對woker和task的管理
檔案:Flower - Celery monitoring tool — Flower 1.0.1 documentation
安裝flower
pip install flower
啟動flower
flower -A celery_study --port=5555
說明:
存取
在瀏覽器輸入:http://127.0.0.1:5555
通過api操作
curl http://127.0.0.1:5555/api/workers
到此這篇關於Django中celery使用專案的文章就介紹到這了,更多相關Django中celery使用內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45