首頁 > 軟體

Python週期任務神器之Schedule模組使用詳解

2022-04-19 13:00:59

如果你想在Linux伺服器上週期性地執行某個 Python 指令碼,最出名的選擇應該是 Crontab 指令碼,但是 Crontab 具有以下缺點:

1.不方便執行秒級的任務。

2.當需要執行的定時任務有上百個的時候,Crontab的管理就會特別不方便

另外一個選擇是 Celery,但是 Celery 的設定比較麻煩,如果你只是需要一個輕量級的排程工具,Celery 不會是一個好選擇。

在你想要使用一個輕量級的任務排程工具,而且希望它儘量簡單、容易使用、不需要外部依賴,最好能夠容納 Crontab 的所有基本功能,那麼 Schedule 模組是你的不二之選。

使用它來排程任務可能只需要幾行程式碼,感受一下:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

上面的程式碼錶示每10分鐘執行一次 job 函數,非常簡單方便。你只需要引入 schedule 模組,通過呼叫 scedule.every(時間數).時間型別.do(job) 釋出週期任務。

釋出後的週期任務需要用 run_pending 函數來檢測是否執行,因此需要一個 While 迴圈不斷地輪詢這個函數。

下面具體講講Schedule模組的安裝和初級、進階使用方法。

1.準備

請選擇以下任一種方式輸入命令安裝依賴:

1. Windows 環境 開啟 Cmd (開始-執行-CMD)。

2. MacOS 環境 開啟 Terminal (command+空格輸入Terminal)。

3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用介面下方的Terminal.

pip install schedule

2.基本使用

最基本的使用在文首已經提到過,下面給大家展示更多的排程任務例子:

import schedule
import time

def job():
    print("I'm working...")

# 每十分鐘執行任務
schedule.every(10).minutes.do(job)
# 每個小時執行任務
schedule.every().hour.do(job)
# 每天的10:30執行任務
schedule.every().day.at("10:30").do(job)
# 每個月執行任務
schedule.every().monday.do(job)
# 每個星期三的13:15分執行任務
schedule.every().wednesday.at("13:15").do(job)
# 每分鐘的第17秒執行任務
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

可以看到,從月到秒的設定,上面的例子都覆蓋到了。不過如果你想只執行一次任務的話,可以這麼配:

import schedule
import time

def job_that_executes_once():
    # 此處編寫的任務只會執行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

引數傳遞

如果你有引數需要傳遞給作業去執行,你只需要這麼做:

import schedule

def greet(name):
    print('Hello', name)

# do() 將額外的引數傳遞給job函數
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

獲取目前所有的作業

如果你想獲取目前所有的作業:

import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()

取消所有作業

如果某些機制觸發了,你需要立即清除當前程式的所有作業:

import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

標籤功能

在設定作業的時候,為了後續方便管理作業,你可以給作業打個標籤,這樣你可以通過標籤過濾獲取作業或取消作業。

import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打標籤
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(標籤):可以獲取所有該標籤的任務
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 標籤的任務
schedule.clear('daily-tasks')

設定作業截止時間

如果你需要讓某個作業到某個時間截止,你可以通過這個方法:

import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每個小時執行作業,18:30後停止
schedule.every(1).hours.until("18:30").do(job)

# 每個小時執行作業,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 每個小時執行作業,8個小時後停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 每個小時執行作業,11:32:42後停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 每個小時執行作業,2020-5-17 11:36:20後停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

截止日期之後,該作業將無法執行。

立即執行所有作業,而不管其安排如何

如果某個機制觸發了,你需要立即執行所有作業,可以呼叫 schedule.run_all() :

import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即執行所有作業,每次作業間隔10秒
schedule.run_all(delay_seconds=10)

3.高階使用

裝飾器安排作業

如果你覺得設定作業這種形式太囉嗦了,也可以使用裝飾器模式:

from schedule import every, repeat, run_pending
import time

# 此裝飾器效果等同於 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

並行執行

預設情況下,Schedule 按順序執行所有作業。其背後的原因是,很難找到讓每個人都高興的並行執行模型。

不過你可以通過多執行緒的形式來執行每個作業以解決此限制:

import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

紀錄檔記錄

Schedule 模組同時也支援 logging 紀錄檔記錄,這麼使用:

import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 紀錄檔級別為DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

例外處理

Schedule 不會自動捕捉異常,它遇到異常會直接丟擲,這會導致一個嚴重的問題:後續所有的作業都會被中斷執行,因此我們需要捕捉到這些異常。

你可以手動捕捉,但是某些你預料不到的情況需要程式進行自動捕獲,加一個裝飾器就能做到了:

import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

這樣,bad_task 在執行時遇到的任何錯誤,都會被 catch_exceptions 捕獲,這點在保證排程任務正常運轉的時候非常關鍵。

到此這篇關於Python週期任務神器之Schedule模組使用詳解的文章就介紹到這了,更多相關Python Schedule模組內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com