首頁 > 軟體

Python flask框架定時任務apscheduler應用介紹

2022-10-12 14:00:37

flask-apschedulerapscheduler移植到了flask應用中,使得在flask中可以非常方便的使用定時任務了,除此之外,它還有如下幾個特性

  • 根據Flask設定載入排程器設定
  • 根據Flask設定載入任務排程器
  • 允許指定伺服器執行任務
  • 提供RESTful API管理任務,也就是遠端管理任務
  • RESTful API提供認證

下載安裝

pip install flask-apscheduler

基本使用

flask-apscheduler的相關設定,我們會將它和其它擴充套件一起,放在應用的設定裡

class Config(object):
    // 設定項
    JOBS = [
        {
            'id': 'job1',
            'func': 'run:add',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 3
        }
    ]
    SCHEDULER_API_ENABLED = True
def add(a, b):
    print(a+b)

JOBS列表的每一個元素表示一個定時任務,列子中只有一個interval任務,表示每隔3秒執行一次函數add。func指定呼叫的函數,args表示傳入函數的引數,trigger表示啟動方式,常用的有兩種,分別是trigger和cron。

上邊我們設定了SCHEDULER_API_ENABLED = True,可以通過存取http://127.0.0.1:5000/scheduler,其中scheduler是預設的RESTful API字首

通過檢視原始碼,可以發現flask-apscheduler提供了以下的介面

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

如果需要檢視當前執行的所有定時任務,則請求http://127.0.0.1:5000/scheduler/jobs即可。

trigger啟動方式

trigger表示間隔啟動,在trigger方式中,使用seconds設定間隔多久啟動一次,單位是秒。

cron啟動方式

cron表示定時啟動

class Config(object):
    JOBS = [
        {
            'id': 'job1',
            'func': 'scheduler:task',
            'args': (1, 2),
            'trigger': 'cron',
            'day': '*',
            'hour': '13',
            'minute': '16',
            'second': '20'
        }
    ]
    SCHEDULER_API_ENABLED = True
def task(a, b):
    print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))

該設定項則表示每天的13點16分20秒啟動一次。*表示全部。

有關常用的cron設定有:

day

  • 表示天

hour

  • 表示小時

minute

  • 表示分鐘

second

  • 表示秒

week

day_of_week

  • 星期幾,如星期天使用sun,星期五使用fri,其他的類似。

使用裝飾器定時啟動任務

from flask import Flask
from flask_apscheduler import APScheduler
import datetime
class Config(object):
    SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
# interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
    print(str(datetime.datetime.now()) + ' Job 1 executed')

表示每隔30秒呼叫一次job1函數。

到此這篇關於Python flask框架定時任務apscheduler應用介紹的文章就介紹到這了,更多相關Python flask apscheduler內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com