<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在日常工作中,我們常常會用到需要週期性執行的任務,一種方式是採用 Linux 系統自帶的 crond 結合命令列實現。另外一種方式是直接使用Python。接下里整理的是常見的Python定時任務的實現方式。
位於 time 模組中的 sleep(secs) 函數,可以實現令當前執行的執行緒暫停 secs 秒後再繼續執行。所謂暫停,即令當前執行緒進入阻塞狀態,當達到 sleep() 函數規定的時間後,再由阻塞狀態轉為就緒狀態,等待 CPU 排程。
基於這樣的特性我們可以通過while死迴圈+sleep()的方式實現簡單的定時任務。
程式碼範例:
import datetime import time def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) def loop_monitor(): while True: time_printer() time.sleep(5) # 暫停5秒 if __name__ == "__main__": loop_monitor()
主要缺點:
Timeloop是一個庫,可用於執行多週期任務。這是一個簡單的庫,它使用decorator模式線上程中執行標記函數。
範例程式碼:
import time from timeloop import Timeloop from datetime import timedelta tl = Timeloop() @tl.job(interval=timedelta(seconds=2)) def sample_job_every_2s(): print "2s job current time : {}".format(time.ctime()) @tl.job(interval=timedelta(seconds=5)) def sample_job_every_5s(): print "5s job current time : {}".format(time.ctime()) @tl.job(interval=timedelta(seconds=10)) def sample_job_every_10s(): print "10s job current time : {}".format(time.ctime())
threading 模組中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是非同步執行,所以不存在等待順序執行問題。
Timer(interval, function, args=[ ], kwargs={ })
程式碼範例:
import datetime from threading import Timer def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor() def loop_monitor(): t = Timer(5, time_printer) t.start() if __name__ == "__main__": loop_monitor()
備註:Timer只能執行一次,這裡需要回圈呼叫,否則只能執行一次
sched模組實現了一個通用事件排程器,在排程器類使用一個延遲函數等待特定的時間,執行任務。同時支援多執行緒應用程式,在每個任務執行後會立刻呼叫延時函數,以確保其他執行緒也能執行。
class sched.scheduler(timefunc, delayfunc)這個類定義了排程事件的通用介面,它需要外部傳入兩個引數,timefunc是一個沒有引數的返回時間型別數位的函數(常用使用的如time模組裡面的time),delayfunc應該是一個需要一個引數來呼叫、與timefunc的輸出相容、並且作用為延遲多個時間單位的函數(常用的如time模組的sleep)。
程式碼範例:
import datetime import time import sched def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor() def loop_monitor(): s = sched.scheduler(time.time, time.sleep) # 生成排程器 s.enter(5, 1, time_printer, ()) s.run() if __name__ == "__main__": loop_monitor()
scheduler物件主要方法:
個人點評:比threading.Timer更好,不需要回圈呼叫。
schedule是一個第三方輕量級的任務排程模組,可以按照秒,分,小時,日期或者自定義事件執行時間。schedule允許使用者使用簡單、人性化的語法以預定的時間間隔定期執行Python函數(或其它可呼叫函數)。
先來看程式碼,是不是不看檔案就能明白什麼意思?
import schedule import time def job(): print("I'm working...") schedule.every(10).seconds.do(job) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
裝飾器:通過 @repeat() 裝飾靜態方法
import time from schedule import every, repeat, run_pending @repeat(every().second) def job(): print('working...') while True: run_pending() time.sleep(1)
傳遞引數:
import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') while True: schedule.run_pending()
裝飾器同樣能傳遞引數:
from schedule import every, repeat, run_pending @repeat(every().second, 'World') @repeat(every().minute, 'Mars') def hello(planet): print('Hello', planet) while True: run_pending()
取消任務:
import schedule i = 0 def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0) job = schedule.every().second.do(some_task) while True: schedule.run_pending()
執行一次任務:
import time import schedule def job_that_executes_once(): print('Hello') return schedule.CancelJob schedule.every().minute.at(':34').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
根據標籤檢索任務:
# 檢索所有任務:schedule.get_jobs() import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends)
根據標籤取消任務:
# 取消所有任務:schedule.clear() import schedule def greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks') schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend') schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest') while True: schedule.run_pending()
執行任務到某時間:
import schedule from datetime import datetime, timedelta, time def job(): print('working...') schedule.every().second.until('23:59').do(job) # 今天23:59停止 schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours=8)).do(job) # 8小時後停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止 while True: schedule.run_pending()
馬上執行所有任務(主要用於測試):
import schedule def job(): print('working...') def job1(): print('Hello...') schedule.every().monday.at('12:40').do(job) schedule.every().tuesday.at('16:40').do(job1) schedule.run_all() schedule.run_all(delay_seconds=3) # 任務間延遲3秒
並行執行:使用 Python 內建佇列實現:
import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
APScheduler(advanceded python scheduler)基於Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab型別的任務,並且可以持久化任務。基於這些功能,我們可以很方便的實現一個Python定時任務系統。
它有以下三個特點:
APScheduler有四種組成部分:
範例程式碼:
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 輸出時間 def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=5, id='my_job_id') sched.start()
Job作為APScheduler最小執行單位。建立Job時指定執行的函數,函數中所需引數,Job執行時的一些設定資訊。
構建說明:
Trigger繫結到Job,在scheduler排程篩選Job時,根據觸發器的規則計算出Job的觸發時間,然後與當前時間比較確定此Job是否會被執行,總之就是根據trigger規則計算出下一個執行時間。
目前APScheduler支援觸發器:
觸發器引數:date
date定時,作業只執行一次。
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
觸發器引數:interval
interval間隔排程
sched.add_job(job_function, 'interval', hours=2)
觸發器引數:cron
cron排程
CronTrigger可用的表示式:
表示式 引數型別 描述 * 所有 萬用字元。例:minutes=*即每分鐘觸發 * / a 所有 每隔時長a執行一次。例:minutes=”* / 3″ 即每隔3分鐘執行一次 a – b 所有 a – b的範圍內觸發。例:minutes=“2-5”。即2到5分鐘內每分鐘執行一次 a – b / c 所有 a – b範圍內,每隔時長c執行一次。 xth y 日 第幾個星期幾觸發。x為第幾個,y為星期幾 last x 日 一個月中,最後一個星期的星期幾觸發 last 日 一個月中的最後一天觸發 x, y, z 所有 組合表示式,可以組合確定值或上述表示式
# 6-8,11-12月第三個週五 00:00, 01:00, 02:00, 03:00執行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 每週一到週五執行 直到2024-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'
Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態新增Executor。每個executor都會繫結一個alias,這個作為唯一標識繫結到Job,在實際執行時會根據Job繫結的executor找到實際的執行器物件,然後根據執行器物件執行Job。
Executor的種類會根據不同的排程來選擇,如果選擇AsyncIO作為排程的庫,那麼選擇AsyncIOExecutor,如果選擇tornado作為排程的庫,選擇TornadoExecutor,如果選擇啟動程序作為排程,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。
Executor的選擇需要根據實際的scheduler來選擇不同的執行器。目前APScheduler支援的Executor:
Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態新增Jobstore。每個jobstore都會繫結一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,並將job新增到jobstore中。作業記憶體決定任務的儲存方式, 預設儲存在記憶體中(MemoryJobStore),重啟後就沒有了。APScheduler支援的任務記憶體有:
不同的任務記憶體可以在排程器的設定中進行設定(見排程器)
Event是APScheduler在進行某些操作時觸發相應的事件,使用者可以自定義一些函數來監聽這些事件,當觸發某些Event時,做一些具體的操作。常見的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時間錯過事件 EVENT_JOB_MISSED。
目前APScheduler定義的Event:
Listener表示使用者自定義監聽的一些Event,比如當Job觸發了EVENT_JOB_MISSED事件時可以根據需求做一些其他處理。
Scheduler是APScheduler的核心,所有相關元件通過其定義。scheduler啟動之後,將開始按照設定的任務進行排程。除了依據所有定義Job的trigger生成的將要排程時間喚醒排程之外。當發生Job資訊變更時也會觸發排程。
APScheduler支援的排程器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler
Scheduler新增job流程:
Scheduler排程流程:
Celery是一個簡單,靈活,可靠的分散式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具, 也可用於任務排程。Celery 的設定比較麻煩,如果你只是需要一個輕量級的排程工具,Celery 不會是一個好選擇。
Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。 非同步任務比如是傳送郵件、或者檔案上傳, 影象處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。
需要注意,celery本身並不具備任務的儲存功能,在排程任務的時候肯定是要把任務存起來的,因此在使用celery的時候還需要搭配一些具備儲存、存取功能的工具,比如:訊息佇列、Redis快取、資料庫等。官方推薦的是訊息佇列RabbitMQ,有些時候使用Redis也是不錯的選擇。
它的架構組成如下圖:
Celery架構,它採用典型的生產者-消費者模式,主要由以下部分組成:
實際應用中,使用者從Web前端發起一個請求,我們只需要將請求所要處理的任務丟入任務佇列broker中,由空閒的worker去處理任務即可,處理的結果會暫存在後臺資料庫backend中。我們可以在一臺機器或多臺機器上同時起多個worker程序來實現分散式地並行處理任務。
Celery定時任務範例:
Apache Airflow 是Airbnb開源的一款資料流程工具,目前是Apache孵化專案。以非常靈活的方式來支援資料的ETL過程,同時還支援非常多的外掛來完成諸如HDFS監控、郵件通知等功能。Airflow支援單機和分散式兩種模式,支援Master-Slave模式,支援Mesos等資源排程,有非常好的擴充套件性。被大量公司採用。
Airflow使用Python開發,它通過DAGs(Directed Acyclic Graph, 有向無環圖)來表達一個工作流中所要執行的任務,以及任務之間的關係和依賴。比如,如下的工作流中,任務T1執行完成,T2和T3才能開始執行,T2和T3都執行完成,T4才能開始執行。
Airflow提供了各種Operator實現,可以完成各種任務實現:
除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。
一些情況下,我們需要根據執行結果執行不同的任務,這樣工作流會產生分支。如:
這種需求可以使用BranchPythonOperator來實現。
通常,在一個運維繫統,資料分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於:
crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。
在一個可延伸的生產環境中,Airflow 含有以下元件:
Worker的具體實現由組態檔中的executor來指定,airflow支援多種Executor:
生產環境一般使用CeleryExecutor和KubernetesExecutor。
使用CeleryExecutor的架構如圖:
使用KubernetesExecutor的架構如圖:
以上就是Python實現定時任務的八種方案詳解的詳細內容,更多關於Python定時任務的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45