首頁 > 軟體

Python利用sched模組實現定時任務

2023-04-04 06:01:03

今天我們來介紹一下Python當中的定時任務,主要用到的模組是sched,當然除了該模組之外,還有其他的例如ApScheduler,但是相比較與sched模組而言,後者還需要另外通過pip命令來下載,稍顯麻煩。那麼在這篇教學當中我們來講講sched模組的使用教學。

牛刀小試

我們先來看下面這個案例,程式碼如下

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 1, say_hello, ("張三", ))
scheduler.run()

那麼上述的程式碼中,第一步首先則是範例化一個定時器,通過如下的程式碼

import sched

scheduler = sched.scheduler()

接下來我們通過enter()方法來執行定時任務的操作,其中的引數分別是延遲的時間、任務的優先順序以及具體的執行函數和執行函數中的引數。像如上的程式碼就會在延遲5秒鐘之後執行say_hello()函數

當然要是延遲的時間相等的時候,我們可以設定任務執行的優先順序來指定函數方法執行的順序,例如有如下的程式碼

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

def say_hello_2(name):
    print(f"Hello, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 2, say_hello, ("張三", ))
scheduler.enter(5, 1, say_hello_2, ("李四", ))
scheduler.run()

如上述程式碼,儘管延遲的時間都是一樣的,但是say_hello()方法的優先順序明顯要比say_hello_2()方法要低一些,因此後者會優先執行。

進階使用

除了讓函數延遲執行,我們還可以讓其重複執行,具體這樣來操作,程式碼如下

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

def repeat_task():
    scheduler.enter(5, 1, say_hello, ())
    scheduler.enter(5, 1, repeat_task, ())

repeat_task()
scheduler.run()

這裡我們新建了一個repeat_task()自定義函數,呼叫了scheduler.enter()方法5秒鐘執行一次之前定義的say_hello()函數

在固定時間執行任務

同時我們還可以讓任務在指定的時間執行,這裡用到scheduler.entertabs()方法,程式碼如下

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 指定時間執行任務
specific_time = time.time() + 5  # 距離現在的5秒鐘之後執行
scheduler.enterabs(specific_time, 1, say_hello, ())

scheduler.run()

我們傳入其中引數使其在指定的時間,也就是距離當下的5秒鐘之後來執行任務

執行多個任務

這裡仍然是呼叫enter()方法來執行多個任務,程式碼如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任務一在兩秒鐘只有執行
scheduler.enter(2, 1, task_one, ())

# 任務二在五秒鐘之後執行
scheduler.enter(5, 1, task_two, ())

scheduler.run()

這裡定義了兩個函數,task_onetask_two裡面分是同樣的執行邏輯,列印出“Hello, world!”,然後task_one()是在兩秒鐘之後執行而task_two()則是在5秒鐘之後執行,兩者執行的優先順序都是一樣的。

以不同的優先順序執行不同的任務

這回我們給task_one()task_two()賦予不同的優先順序,看一看執行的結果如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 優先順序是1
scheduler.enter(2, 2, task_one, ())

# 優先順序是2
scheduler.enter(5, 1, task_two, ())

scheduler.run()

output

Task One - Hello, world!
Task Two - Hello, world!

上述的程式碼會在停頓兩秒之後執行task_one()函數,再停頓3秒之後執行task_two()函數

定時任務加上取消方法

我們給定時任務新增上取消的方法,程式碼如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任務一在兩秒鐘只有執行
task_one_event = scheduler.enter(2, 1, task_one, ())

# 任務二在五秒鐘之後執行
task_two_event = scheduler.enter(5, 1, task_two, ())

# 取消執行task_one
scheduler.cancel(task_one_event)

scheduler.run()

我們將兩秒鐘之後執行的task_one()方法給取消掉,最後就只執行了task_two()方法,也就列印出來“Task Two - Hello, world!”

執行備份程式

我們來寫一個備份的指令碼,在每天固定的時間將檔案備份,程式碼如下

import sched
import time
import shutil

def backup_files():
    source = '路徑/files'
    destination = '路徑二'
    shutil.copytree(source, destination)

def schedule_backup():
    # 建立新的定時器
    scheduler = sched.scheduler(time.time, time.sleep)

    # 備份程式在每天的1點來執行
    backup_time = time.strptime('01:00:00', '%H:%M:%S')
    backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ())

    # 開啟定時任務
    scheduler.run()

schedule_backup()

我們通過shutil模組當中的copytree()方法來執行拷貝檔案,然後在每天的1點準時執行

執行定時分發郵件的程式

最後我們來執行定時分發郵件的程式,程式碼如下

import sched
import time
import smtplib
from email.mime.text import MIMEText

def send_email(subject, message, from_addr, to_addr, smtp_server):
    # 郵件的主體資訊
    email = MIMEText(message)
    email['Subject'] = subject
    email['From'] = from_addr
    email['To'] = to_addr

    # 發郵件
    with smtplib.SMTP(smtp_server) as server:
        server.send_message(email)

def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time):
    # 建立定時任務的範例
    scheduler = sched.scheduler(time.time, time.sleep)

    # 定時郵件
    scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server))

    # 開啟定時器
    scheduler.run()

subject = 'Test Email'
message = 'This is a test email'
from_addr = 'test@example.com'
to_addr = 'test@example.com'
smtp_server = 'smtp.test.com'

scheduled_time = time.time() + 60 # 一分鐘之後執行程式
send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)

到此這篇關於Python利用sched模組實現定時任務的文章就介紹到這了,更多相關Python sched定時任務內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com