首頁 > 軟體

讓Python程式定時執行的8種方法整理

2023-02-01 18:01:01

在日常工作中,我們常常會用到需要週期性執行的任務,一種方式是採用 Linux 系統自帶的 crond 結合命令列實現,另外一種方式是直接使用Python。

最近我整理了一下 Python 定時任務的實現方式,內容較長,建議收藏後學習。

1. 利用while True: + sleep()實現定時任務

位於 time 模組中的 sleep(secs) 函數,可以實現令當前執行的執行緒暫停 secs 秒後再繼續執行。所謂暫停,即令當前執行緒進入阻塞狀態,當達到 sleep() 函數規定的時間後,再由阻塞狀態轉為就緒狀態,等待 CPU 排程。

基於這樣的特性我們可以通過while死迴圈+sleep()的方式實現簡單的定時任務。

程式碼範例:

import datetime
import time
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)  # 暫停5秒
if __name__ == "__main__":
    loop_monitor()

主要缺點:

只能設定間隔,不能指定具體的時間,比如每天早上8:00

sleep 是一個阻塞函數,也就是說 sleep 這一段時間,程式什麼也不能操作。

2. 使用Timeloop庫執行定時任務

Timeloop是一個庫,可用於執行多週期任務。這是一個簡單的庫,它使用decorator模式線上程中執行標記函數。

範例程式碼:

import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print "2s job current time : {}".format(time.ctime())
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
    print "5s job current time : {}".format(time.ctime())
@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
    print "10s job current time : {}".format(time.ctime())

3. 利用threading.Timer實現定時任務

threading 模組中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是非同步執行,所以不存在等待順序執行問題。

Timer(interval, function, args=[ ], kwargs={ })

  • interval: 指定的時間
  • function: 要執行的方法
  • args/kwargs: 方法的引數

程式碼範例:

備註:Timer只能執行一次,這裡需要回圈呼叫,否則只能執行一次

4. 利用內建模組sched實現定時任務

sched模組實現了一個通用事件排程器,在排程器類使用一個延遲函數等待特定的時間,執行任務。同時支援多執行緒應用程式,在每個任務執行後會立刻呼叫延時函數,以確保其他執行緒也能執行。

class sched.scheduler(timefunc, delayfunc)這個類定義了排程事件的通用介面,它需要外部傳入兩個引數,timefunc是一個沒有引數的返回時間型別數位的函數(常用使用的如time模組裡面的time),delayfunc應該是一個需要一個引數來呼叫、與timefunc的輸出相容、並且作用為延遲多個時間單位的函數(常用的如time模組的sleep)。

程式碼範例:

import datetime
import time
import sched
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成排程器
    s.enter(5, 1, time_printer, ())
    s.run()
if __name__ == "__main__":
    loop_monitor()

scheduler物件主要方法:

  • enter(delay, priority, action, argument),安排一個事件來延遲delay個時間單位。
  • cancel(event):從佇列中刪除事件。如果事件不是當前佇列中的事件,則該方法將跑出一個ValueError。
  • run():執行所有預定的事件。這個函數將等待(使用傳遞給建構函式的delayfunc()函數),然後執行事件,直到不再有預定的事件。

個人點評:比threading.Timer更好,不需要回圈呼叫。

5. 利用排程模組schedule實現定時任務

schedule是一個第三方輕量級的任務排程模組,可以按照秒,分,小時,日期或者自定義事件執行時間。schedule允許使用者使用簡單、人性化的語法以預定的時間間隔定期執行Python函數(或其它可呼叫函數)。

先來看程式碼,是不是不看檔案就能明白什麼意思?

import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

裝飾器:通過 @repeat() 裝飾靜態方法

import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)

傳遞引數:

import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()

裝飾器同樣能傳遞引數:

from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()

取消任務:

import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

執行一次任務:

import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

根據標籤檢索任務:

# 檢索所有任務:schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)

根據標籤取消任務:

# 取消所有任務:schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()

執行任務到某時間:

import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小時後停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()

馬上執行所有任務(主要用於測試):

import schedule
def job():
    print('working...')
def job1():
    print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任務間延遲3秒

並行執行:使用 Python 內建佇列實現:

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

6. 利用任務框架APScheduler實現定時任務

APScheduler(advanceded python scheduler)基於Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab型別的任務,並且可以持久化任務。基於這些功能,我們可以很方便的實現一個Python定時任務系統。

它有以下三個特點:

  • 類似於 Liunx Cron 的排程程式(可選的開始/結束時間)
  • 基於時間間隔的執行排程(週期性排程,可選的開始/結束時間)
  • 一次性執行任務(在設定的日期/時間執行一次任務)

APScheduler有四種組成部分:

  • 觸發器(trigger) 包含排程邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會執行。除了他們自己初始設定意外,觸發器完全是無狀態的。
  • 作業儲存(job store) 儲存被排程的作業,預設的作業儲存是簡單地把作業儲存在記憶體中,其他的作業儲存是將作業儲存在資料庫中。一個作業的資料講在儲存在持久化作業儲存時被序列化,並在載入時被反序列化。排程器不能分享同一個作業儲存。
  • 執行器(executor) 處理作業的執行,他們通常通過在作業中提交制定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。
  • 排程器(scheduler) 是其他的組成部分。你通常在應用只有一個排程器,應用的開發者通常不會直接處理作業儲存、排程器和觸發器,相反,排程器提供了處理這些的合適的介面。設定作業儲存和執行器可以在排程器中完成,例如新增、修改和移除作業。通過設定executor、jobstore、trigger,使用執行緒池(ThreadPoolExecutor預設值20)或程序池(ProcessPoolExecutor 預設值5)並且預設最多3個(max_instances)任務範例同時執行,實現對job的增刪改查等排程控制

範例程式碼:

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 輸出時間
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()

6.1 Job 作業

Job作為APScheduler最小執行單位。建立Job時指定執行的函數,函數中所需引數,Job執行時的一些設定資訊。

構建說明:

  • id:指定作業的唯一ID
  • name:指定作業的名字
  • trigger:apscheduler定義的觸發器,用於確定Job的執行時間,根據設定的trigger規則,計算得到下次執行此job的時間, 滿足時將會執行
  • executor:apscheduler定義的執行器,job建立時設定執行器的名字,根據字串你名字到scheduler獲取到執行此job的 執行器,執行job指定的函數
  • max_instances:執行此job的最大範例數,executor執行job時,根據job的id來計算執行次數,根據設定的最大範例數來確定是否可執行
  • next_run_time:Job下次的執行時間,建立Job時可以指定一個時間[datetime],不指定的話則預設根據trigger獲取觸發時間
  • misfire_grace_time:Job的延遲執行時間,例如Job的計劃執行時間是21:00:00,但因服務重啟或其他原因導致21:00:31才執行,如果設定此key為40,則該job會繼續執行,否則將會丟棄此job
  • coalesce:Job是否合併執行,是一個bool值。例如scheduler停止20s後重啟啟動,而job的觸發器設定為5s執行一次,因此此job錯過了4個執行時間,如果設定為是,則會合併到一次執行,否則會逐個執行
  • func:Job執行的函數
  • args:Job執行函數需要的位置引數
  • kwargs:Job執行函數需要的關鍵字引數

6.2 Trigger 觸發器

Trigger繫結到Job,在scheduler排程篩選Job時,根據觸發器的規則計算出Job的觸發時間,然後與當前時間比較確定此Job是否會被執行,總之就是根據trigger規則計算出下一個執行時間。

目前APScheduler支援觸發器:

  • 指定時間的DateTrigger
  • 指定間隔時間的IntervalTrigger
  • 像Linux的crontab一樣的CronTrigger。

觸發器引數:date

date定時,作業只執行一次。

  • run_date (datetime|str) – the date/time to run the job at
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])

觸發器引數:interval

interval間隔排程

  • weeks (int) – 間隔幾周
  • days (int) – 間隔幾天
  • hours (int) – 間隔幾小時
  • minutes (int) – 間隔幾分鐘
  • seconds (int) – 間隔多少秒
  • start_date (datetime|str) – 開始日期
  • end_date (datetime|str) – 結束日期
  • timezone (datetime.tzinfo|str) – 時區
  • sched.add_job(job_function, 'interval', hours=2)

觸發器引數:cron

cron排程

  • (int|str) 表示引數既可以是int型別,也可以是str型別
  • (datetime | str) 表示引數既可以是datetime型別,也可以是str型別
  • year (int|str) – 4-digit year -(表示四位數的年份,如2008年)
  • month (int|str) – month (1-12) -(表示取值範圍為1-12月)
  • day (int|str) – day of the (1-31) -(表示取值範圍為1-31日)
  • week (int|str) – ISO week (1-53) -(格里曆2006年12月31日可以寫成2006年-W52-7(擴充套件形式)或2006W527(緊湊形式))
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一週中的第幾天,既可以用0-6表示也可以用其英語縮寫表示)
  • hour (int|str) – hour (0-23) – (表示取值範圍為0-23時)
  • minute (int|str) – minute (0-59) – (表示取值範圍為0-59分)
  • second (int|str) – second (0-59) – (表示取值範圍為0-59秒)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開始時間)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結束時間)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時區取值)

CronTrigger可用的表示式:

# 6-8,11-12月第三個週五 00:00, 01:00, 02:00, 03:00執行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每週一到週五執行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

6.3 Executor 執行器

Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態新增Executor。每個executor都會繫結一個alias,這個作為唯一標識繫結到Job,在實際執行時會根據Job繫結的executor找到實際的執行器物件,然後根據執行器物件執行Job。

Executor的種類會根據不同的排程來選擇,如果選擇AsyncIO作為排程的庫,那麼選擇AsyncIOExecutor,如果選擇tornado作為排程的庫,選擇TornadoExecutor,如果選擇啟動程序作為排程,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。

Executor的選擇需要根據實際的scheduler來選擇不同的執行器。目前APScheduler支援的Executor:

  • executors.asyncio:同步io,阻塞
  • executors.gevent:io多路複用,非阻塞
  • executors.pool: 執行緒ThreadPoolExecutor和程序ProcessPoolExecutor
  • executors.twisted:基於事件驅動

6.4 Jobstore 作業儲存

Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態新增Jobstore。每個jobstore都會繫結一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,並將job新增到jobstore中。作業記憶體決定任務的儲存方式, 預設儲存在記憶體中(MemoryJobStore),重啟後就沒有了。APScheduler支援的任務記憶體有:

  • jobstores.memory:記憶體
  • jobstores.mongodb:儲存在mongodb
  • jobstores.redis:儲存在redis
  • jobstores.rethinkdb:儲存在rethinkdb
  • jobstores.sqlalchemy:支援sqlalchemy的資料庫如mysql,sqlite等
  • jobstores.zookeeper:zookeeper

不同的任務記憶體可以在排程器的設定中進行設定(見排程器)

6.5 Event 事件

Event是APScheduler在進行某些操作時觸發相應的事件,使用者可以自定義一些函數來監聽這些事件,當觸發某些Event時,做一些具體的操作。常見的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時間錯過事件 EVENT_JOB_MISSED。

目前APScheduler定義的Event:

  • EVENT_SCHEDULER_STARTED
  • EVENT_SCHEDULER_START
  • EVENT_SCHEDULER_SHUTDOWN
  • EVENT_SCHEDULER_PAUSED
  • EVENT_SCHEDULER_RESUMED
  • EVENT_EXECUTOR_ADDED
  • EVENT_EXECUTOR_REMOVED
  • EVENT_JOBSTORE_ADDED
  • EVENT_JOBSTORE_REMOVED
  • EVENT_ALL_JOBS_REMOVED
  • EVENT_JOB_ADDED
  • EVENT_JOB_REMOVED
  • EVENT_JOB_MODIFIED
  • EVENT_JOB_EXECUTED
  • EVENT_JOB_ERROR
  • EVENT_JOB_MISSED
  • EVENT_JOB_SUBMITTED
  • EVENT_JOB_MAX_INSTANCES

Listener表示使用者自定義監聽的一些Event,比如當Job觸發了EVENT_JOB_MISSED事件時可以根據需求做一些其他處理。

6.6 排程器

Scheduler是APScheduler的核心,所有相關元件通過其定義。scheduler啟動之後,將開始按照設定的任務進行排程。除了依據所有定義Job的trigger生成的將要排程時間喚醒排程之外。當發生Job資訊變更時也會觸發排程。

APScheduler支援的排程器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

  • BlockingScheduler:適用於排程程式是程序中唯一執行的程序,呼叫start函數會阻塞當前執行緒,不能立即返回。
  • BackgroundScheduler:適用於排程程式在應用程式的後臺執行,呼叫start後主執行緒不會阻塞。
  • AsyncIOScheduler:適用於使用了asyncio模組的應用程式。
  • GeventScheduler:適用於使用gevent模組的應用程式。
  • TwistedScheduler:適用於構建Twisted的應用程式。
  • QtScheduler:適用於構建Qt的應用程式。

6.7 Scheduler的工作流程

Scheduler新增job流程:

Scheduler排程流程:

7. 使用分散式訊息系統Celery實現定時任務

Celery是一個簡單,靈活,可靠的分散式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具, 也可用於任務排程。Celery 的設定比較麻煩,如果你只是需要一個輕量級的排程工具,Celery 不會是一個好選擇。

Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。非同步任務比如是傳送郵件、或者檔案上傳, 影象處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。

需要注意,celery本身並不具備任務的儲存功能,在排程任務的時候肯定是要把任務存起來的,因此在使用celery的時候還需要搭配一些具備儲存、存取功能的工具,比如:訊息佇列、Redis快取、資料庫等。官方推薦的是訊息佇列RabbitMQ,有些時候使用Redis也是不錯的選擇。

它的架構組成如下圖:

Celery架構,它採用典型的生產者-消費者模式,主要由以下部分組成:

  • Celery Beat,任務排程器,Beat程序會讀取組態檔的內容,週期性地將設定中到期需要執行的任務傳送給任務佇列。
  • Producer:需要在佇列中進行的任務,一般由使用者、觸發器或其他操作將任務入隊,然後交由workers進行處理。呼叫了Celery提供的API、函數或者裝飾器而產生任務並交給任務佇列處理的都是任務生產者。
  • Broker,即訊息中介軟體,在這指任務佇列本身,Celery扮演生產者和消費者的角色,brokers就是生產者和消費者存放/獲取產品的地方(佇列)。
  • Celery Worker,執行任務的消費者,從佇列中取出任務並執行。通常會在多臺伺服器執行多個消費者來提高執行效率。
  • Result Backend:任務處理完後儲存狀態資訊和結果,以供查詢。Celery預設已支援Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

實際應用中,使用者從Web前端發起一個請求,我們只需要將請求所要處理的任務丟入任務佇列broker中,由空閒的worker去處理任務即可,處理的結果會暫存在後臺資料庫backend中。我們可以在一臺機器或多臺機器上同時起多個worker程序來實現分散式地並行處理任務。

Celery定時任務範例:

  • Python Celery & RabbitMQ Tutorial
  • Celery 設定實踐筆記

8. 使用資料流工具Apache Airflow實現定時任務

Apache Airflow 是Airbnb開源的一款資料流程工具,目前是Apache孵化專案。以非常靈活的方式來支援資料的ETL過程,同時還支援非常多的外掛來完成諸如HDFS監控、郵件通知等功能。Airflow支援單機和分散式兩種模式,支援Master-Slave模式,支援Mesos等資源排程,有非常好的擴充套件性。被大量公司採用。

Airflow使用Python開發,它通過DAGs(Directed Acyclic Graph, 有向無環圖)來表達一個工作流中所要執行的任務,以及任務之間的關係和依賴。比如,如下的工作流中,任務T1執行完成,T2和T3才能開始執行,T2和T3都執行完成,T4才能開始執行。

Airflow提供了各種Operator實現,可以完成各種任務實現:

  • BashOperator – 執行 bash 命令或指令碼。
  • SSHOperator – 執行遠端 bash 命令或指令碼(原理同 paramiko 模組)。
  • PythonOperator – 執行 Python 函數。
  • EmailOperator – 傳送 Email。
  • HTTPOperator – 傳送一個 HTTP 請求。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執行 SQL 任務。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。

一些情況下,我們需要根據執行結果執行不同的任務,這樣工作流會產生分支。如:

這種需求可以使用BranchPythonOperator來實現。

 8.1 Airflow 產生的背景

通常,在一個運維繫統,資料分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於:

  • 時間依賴:任務需要等待某一個時間點觸發。
  • 外部系統依賴:任務依賴外部系統需要呼叫介面去存取。
  • 任務間依賴:任務 A 需要在任務 B 完成後啟動,兩個任務互相間會產生影響。
  • 資源環境依賴:任務消耗資源非常多, 或者只能在特定的機器上執行。

crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。

  • Airflow 是一種 WMS,即:它將任務以及它們的依賴看作程式碼,按照那些計劃規範任務執行,並在實際工作程序之間分發需執行的任務。
  • Airflow 提供了一個用於顯示當前活動任務和過去任務狀態的優秀 UI,並允許使用者手動管理任務的執行和狀態。
  • Airflow 中的工作流是具有方向性依賴的任務集合。
  • DAG 中的每個節點都是一個任務,DAG 中的邊表示的是任務之間的依賴(強制為有向無環,因此不會出現迴圈依賴,從而導致無限執行迴圈)。

 8.2 Airflow 核心概念

DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要執行的tasks按照依賴關係組織起來,描述的是所有tasks執行順序。

Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內建了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 呼叫任意的Python 函數,EmailOperator 用於傳送郵件,HTTPOperator 用於傳送HTTP請求, SqlOperator 用於執行SQL命令等等,同時,使用者可以自定義Operator,這給使用者提供了極大的便利性。

Tasks:Task 是 Operator的一個範例,也就是DAGs中的一個node。

Task Instance:task的一次執行。Web 介面中可以看到task instance 有自己的狀態,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。

Task Relationships:DAGs中的不同Tasks之間可以有依賴關係,如 Task1 >> Task2,表明Task2依賴於Task2了。通過將DAGs和Operators結合起來,使用者就可以建立各種複雜的 工作流(workflow)。

 8.3 Airflow 的架構

在一個可延伸的生產環境中,Airflow 含有以下元件:

後設資料庫:這個資料庫儲存有關任務狀態的資訊。

排程器:Scheduler 是一種使用 DAG 定義結合後設資料中的任務狀態來決定哪些任務需要被執行以及任務執行優先順序的過程。排程器通常作為服務執行。

執行器:Executor 是一個訊息佇列程序,它被繫結到排程器中,用於確定實際執行每個任務計劃的工作程序。有不同型別的執行器,每個執行器都使用一個指定工作程序的類來執行任務。例如,LocalExecutor 使用與排程器程序在同一臺機器上執行的並行程序執行任務。其他像 CeleryExecutor 的執行器使用存在於獨立的工作機器叢集中的工作程序執行任務。

Workers:這些是實際執行任務邏輯的程序,由正在使用的執行器確定。

Worker的具體實現由組態檔中的executor來指定,airflow支援多種Executor:

  • SequentialExecutor: 單程序順序執行,一般只用來測試
  • LocalExecutor: 本地多程序執行
  • CeleryExecutor: 使用Celery進行分散式任務排程
  • DaskExecutor:使用Dask進行分散式任務排程
  • KubernetesExecutor: 1.10.0新增, 建立臨時POD執行每次任務

生產環境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架構如圖:

使用KubernetesExecutor的架構如圖:

以上就是讓Python程式定時執行的8種方法整理的詳細內容,更多關於Python程式定時執行的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com