<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在日常工作中,我們常常會用到需要週期性執行的任務,一種方式是採用 Linux 系統自帶的 crond 結合命令列實現,另外一種方式是直接使用Python。
最近我整理了一下 Python 定時任務的實現方式,內容較長,建議收藏後學習。
位於 time 模組中的 sleep(secs) 函數,可以實現令當前執行的執行緒暫停 secs 秒後再繼續執行。所謂暫停,即令當前執行緒進入阻塞狀態,當達到 sleep() 函數規定的時間後,再由阻塞狀態轉為就緒狀態,等待 CPU 排程。
基於這樣的特性我們可以通過while死迴圈+sleep()的方式實現簡單的定時任務。
程式碼範例:
import datetime import time def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) def loop_monitor(): while True: time_printer() time.sleep(5) # 暫停5秒 if __name__ == "__main__": loop_monitor()
主要缺點:
只能設定間隔,不能指定具體的時間,比如每天早上8:00
sleep 是一個阻塞函數,也就是說 sleep 這一段時間,程式什麼也不能操作。
Timeloop是一個庫,可用於執行多週期任務。這是一個簡單的庫,它使用decorator模式線上程中執行標記函數。
範例程式碼:
import time from timeloop import Timeloop from datetime import timedelta tl = Timeloop() @tl.job(interval=timedelta(seconds=2)) def sample_job_every_2s(): print "2s job current time : {}".format(time.ctime()) @tl.job(interval=timedelta(seconds=5)) def sample_job_every_5s(): print "5s job current time : {}".format(time.ctime()) @tl.job(interval=timedelta(seconds=10)) def sample_job_every_10s(): print "10s job current time : {}".format(time.ctime())
threading 模組中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是非同步執行,所以不存在等待順序執行問題。
Timer(interval, function, args=[ ], kwargs={ })
程式碼範例:
備註:Timer只能執行一次,這裡需要回圈呼叫,否則只能執行一次
sched模組實現了一個通用事件排程器,在排程器類使用一個延遲函數等待特定的時間,執行任務。同時支援多執行緒應用程式,在每個任務執行後會立刻呼叫延時函數,以確保其他執行緒也能執行。
class sched.scheduler(timefunc, delayfunc)這個類定義了排程事件的通用介面,它需要外部傳入兩個引數,timefunc是一個沒有引數的返回時間型別數位的函數(常用使用的如time模組裡面的time),delayfunc應該是一個需要一個引數來呼叫、與timefunc的輸出相容、並且作用為延遲多個時間單位的函數(常用的如time模組的sleep)。
程式碼範例:
import datetime import time import sched def time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor() def loop_monitor(): s = sched.scheduler(time.time, time.sleep) # 生成排程器 s.enter(5, 1, time_printer, ()) s.run() if __name__ == "__main__": loop_monitor()
scheduler物件主要方法:
個人點評:比threading.Timer更好,不需要回圈呼叫。
schedule是一個第三方輕量級的任務排程模組,可以按照秒,分,小時,日期或者自定義事件執行時間。schedule允許使用者使用簡單、人性化的語法以預定的時間間隔定期執行Python函數(或其它可呼叫函數)。
先來看程式碼,是不是不看檔案就能明白什麼意思?
import schedule import time def job(): print("I'm working...") schedule.every(10).seconds.do(job) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
裝飾器:通過 @repeat() 裝飾靜態方法
import time from schedule import every, repeat, run_pending @repeat(every().second) def job(): print('working...') while True: run_pending() time.sleep(1)
傳遞引數:
import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') while True: schedule.run_pending()
裝飾器同樣能傳遞引數:
from schedule import every, repeat, run_pending @repeat(every().second, 'World') @repeat(every().minute, 'Mars') def hello(planet): print('Hello', planet) while True: run_pending()
取消任務:
import schedule i = 0 def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0) job = schedule.every().second.do(some_task) while True: schedule.run_pending()
執行一次任務:
import time import schedule def job_that_executes_once(): print('Hello') return schedule.CancelJob schedule.every().minute.at(':34').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
根據標籤檢索任務:
# 檢索所有任務:schedule.get_jobs() import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends)
根據標籤取消任務:
# 取消所有任務:schedule.clear() import schedule def greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks') schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend') schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest') while True: schedule.run_pending()
執行任務到某時間:
import schedule from datetime import datetime, timedelta, time def job(): print('working...') schedule.every().second.until('23:59').do(job) # 今天23:59停止 schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours=8)).do(job) # 8小時後停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止 while True: schedule.run_pending()
馬上執行所有任務(主要用於測試):
import schedule def job(): print('working...') def job1(): print('Hello...') schedule.every().monday.at('12:40').do(job) schedule.every().tuesday.at('16:40').do(job1) schedule.run_all() schedule.run_all(delay_seconds=3) # 任務間延遲3秒
並行執行:使用 Python 內建佇列實現:
import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
APScheduler(advanceded python scheduler)基於Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab型別的任務,並且可以持久化任務。基於這些功能,我們可以很方便的實現一個Python定時任務系統。
它有以下三個特點:
APScheduler有四種組成部分:
範例程式碼:
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 輸出時間 def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=5, id='my_job_id') sched.start()
Job作為APScheduler最小執行單位。建立Job時指定執行的函數,函數中所需引數,Job執行時的一些設定資訊。
構建說明:
Trigger繫結到Job,在scheduler排程篩選Job時,根據觸發器的規則計算出Job的觸發時間,然後與當前時間比較確定此Job是否會被執行,總之就是根據trigger規則計算出下一個執行時間。
目前APScheduler支援觸發器:
觸發器引數:date
date定時,作業只執行一次。
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
觸發器引數:interval
interval間隔排程
觸發器引數:cron
cron排程
CronTrigger可用的表示式:
# 6-8,11-12月第三個週五 00:00, 01:00, 02:00, 03:00執行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 每週一到週五執行 直到2024-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'
Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態新增Executor。每個executor都會繫結一個alias,這個作為唯一標識繫結到Job,在實際執行時會根據Job繫結的executor找到實際的執行器物件,然後根據執行器物件執行Job。
Executor的種類會根據不同的排程來選擇,如果選擇AsyncIO作為排程的庫,那麼選擇AsyncIOExecutor,如果選擇tornado作為排程的庫,選擇TornadoExecutor,如果選擇啟動程序作為排程,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。
Executor的選擇需要根據實際的scheduler來選擇不同的執行器。目前APScheduler支援的Executor:
Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態新增Jobstore。每個jobstore都會繫結一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,並將job新增到jobstore中。作業記憶體決定任務的儲存方式, 預設儲存在記憶體中(MemoryJobStore),重啟後就沒有了。APScheduler支援的任務記憶體有:
不同的任務記憶體可以在排程器的設定中進行設定(見排程器)
Event是APScheduler在進行某些操作時觸發相應的事件,使用者可以自定義一些函數來監聽這些事件,當觸發某些Event時,做一些具體的操作。常見的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時間錯過事件 EVENT_JOB_MISSED。
目前APScheduler定義的Event:
Listener表示使用者自定義監聽的一些Event,比如當Job觸發了EVENT_JOB_MISSED事件時可以根據需求做一些其他處理。
Scheduler是APScheduler的核心,所有相關元件通過其定義。scheduler啟動之後,將開始按照設定的任務進行排程。除了依據所有定義Job的trigger生成的將要排程時間喚醒排程之外。當發生Job資訊變更時也會觸發排程。
APScheduler支援的排程器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler
Scheduler新增job流程:
Scheduler排程流程:
Celery是一個簡單,靈活,可靠的分散式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具, 也可用於任務排程。Celery 的設定比較麻煩,如果你只是需要一個輕量級的排程工具,Celery 不會是一個好選擇。
Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。非同步任務比如是傳送郵件、或者檔案上傳, 影象處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。
需要注意,celery本身並不具備任務的儲存功能,在排程任務的時候肯定是要把任務存起來的,因此在使用celery的時候還需要搭配一些具備儲存、存取功能的工具,比如:訊息佇列、Redis快取、資料庫等。官方推薦的是訊息佇列RabbitMQ,有些時候使用Redis也是不錯的選擇。
它的架構組成如下圖:
Celery架構,它採用典型的生產者-消費者模式,主要由以下部分組成:
實際應用中,使用者從Web前端發起一個請求,我們只需要將請求所要處理的任務丟入任務佇列broker中,由空閒的worker去處理任務即可,處理的結果會暫存在後臺資料庫backend中。我們可以在一臺機器或多臺機器上同時起多個worker程序來實現分散式地並行處理任務。
Celery定時任務範例:
Apache Airflow 是Airbnb開源的一款資料流程工具,目前是Apache孵化專案。以非常靈活的方式來支援資料的ETL過程,同時還支援非常多的外掛來完成諸如HDFS監控、郵件通知等功能。Airflow支援單機和分散式兩種模式,支援Master-Slave模式,支援Mesos等資源排程,有非常好的擴充套件性。被大量公司採用。
Airflow使用Python開發,它通過DAGs(Directed Acyclic Graph, 有向無環圖)來表達一個工作流中所要執行的任務,以及任務之間的關係和依賴。比如,如下的工作流中,任務T1執行完成,T2和T3才能開始執行,T2和T3都執行完成,T4才能開始執行。
Airflow提供了各種Operator實現,可以完成各種任務實現:
除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。
一些情況下,我們需要根據執行結果執行不同的任務,這樣工作流會產生分支。如:
這種需求可以使用BranchPythonOperator來實現。
通常,在一個運維繫統,資料分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於:
crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。
DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要執行的tasks按照依賴關係組織起來,描述的是所有tasks執行順序。
Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內建了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 呼叫任意的Python 函數,EmailOperator 用於傳送郵件,HTTPOperator 用於傳送HTTP請求, SqlOperator 用於執行SQL命令等等,同時,使用者可以自定義Operator,這給使用者提供了極大的便利性。
Tasks:Task 是 Operator的一個範例,也就是DAGs中的一個node。
Task Instance:task的一次執行。Web 介面中可以看到task instance 有自己的狀態,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
Task Relationships:DAGs中的不同Tasks之間可以有依賴關係,如 Task1 >> Task2,表明Task2依賴於Task2了。通過將DAGs和Operators結合起來,使用者就可以建立各種複雜的 工作流(workflow)。
在一個可延伸的生產環境中,Airflow 含有以下元件:
後設資料庫:這個資料庫儲存有關任務狀態的資訊。
排程器:Scheduler 是一種使用 DAG 定義結合後設資料中的任務狀態來決定哪些任務需要被執行以及任務執行優先順序的過程。排程器通常作為服務執行。
執行器:Executor 是一個訊息佇列程序,它被繫結到排程器中,用於確定實際執行每個任務計劃的工作程序。有不同型別的執行器,每個執行器都使用一個指定工作程序的類來執行任務。例如,LocalExecutor 使用與排程器程序在同一臺機器上執行的並行程序執行任務。其他像 CeleryExecutor 的執行器使用存在於獨立的工作機器叢集中的工作程序執行任務。
Workers:這些是實際執行任務邏輯的程序,由正在使用的執行器確定。
Worker的具體實現由組態檔中的executor來指定,airflow支援多種Executor:
生產環境一般使用CeleryExecutor和KubernetesExecutor。
使用CeleryExecutor的架構如圖:
使用KubernetesExecutor的架構如圖:
以上就是讓Python程式定時執行的8種方法整理的詳細內容,更多關於Python程式定時執行的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45