首頁 > 軟體

一文詳解Golang 定時任務庫 gron 設計和原理

2022-08-18 14:02:38

 cron 簡介

在 Unix-like 作業系統中,有一個大家都很熟悉的 cli 工具,它能夠來處理定時任務,週期性任務,這就是: cron。 你只需要簡單的語法控制就能實現任意【定時】的語意。用法上可以參考一下這個 Crontab Guru Editor,做的非常精巧。

簡單說,每一個位都代表了一個時間維度,* 代表全集,所以,上面的語意是:在每天早上的4點05分觸發任務。

但 cron 畢竟只是一個作業系統級別的工具,如果定時任務失敗了,或者壓根沒啟動,cron 是沒法提醒開發者這一點的。並且,cron 和 正規表示式都有一種魔力,不知道大家是否感同身受,這裡參照同事的一句名言:

這世界上有些語言非常相似: shell指令碼, es查詢的那個dsl語言, 定時任務的crontab, 正規表示式. 他們相似就相似在每次要寫的時候基本都得重新現學一遍。

正巧,最近看到了 gron 這個開源專案,它是用 Golang 實現一個並行安全的定時任務庫。實現非常簡單精巧,程式碼量也不多。今天我們就來一起結合原始碼看一下,怎樣基於 Golang 的能力做出來一個【定時任務庫】。

gron

Gron provides a clear syntax for writing and deploying cron jobs.

gron 是一個泰國小哥在 2016 年開源的作品,它的特點就在於非常簡單和清晰的語意來定義【定時任務】,你不用再去記 cron 的語法。我們來看下作為使用者怎樣上手。

首先,我們還是一個 go get 安裝依賴:

$ go get github.com/roylee0704/gron

假設我們期望在【時機】到了以後,要做的工作是列印一個字串,每一個小時執行一次,我們就可以這樣:

package main

import (
	"fmt"
	"time"
	"github.com/roylee0704/gron"
)
func main() {
	c := gron.New()
	c.AddFunc(gron.Every(1*time.Hour), func() {
		fmt.Println("runs every hour.")
	})
	c.Start()
}

非常簡單,而且即便是在 c.Start 之後我們依然可以新增新的定時任務進去。支援了很好的擴充套件性。

定時引數

注意到我們呼叫 gron.New().AddFunc() 時傳入了一個 gron.Every(1*time.Hour)

這裡其實你可以傳入任何一個 time.Duration,從而把排程間隔從 1 小時調整到 1 分鐘甚至 1 秒。

除此之外,gron 還很貼心地封裝了一個 xtime 包用來把常見的 time.Duration 封裝起來,這裡我們開箱即用。

import "github.com/roylee0704/gron/xtime"

gron.Every(1 * xtime.Day)
gron.Every(1 * xtime.Week)

很多時候我們不僅僅某個任務在當天執行,還希望是我們指定的時刻,而不是依賴程式啟動時間,機械地加 24 hour。gron 對此也做了很好的支援:

gron.Every(30 * xtime.Day).At("00:00")
gron.Every(1 * xtime.Week).At("23:59")

我們只需指定 At("hh:mm") 就可以實現在指定時間執行。

原始碼解析

這一節我們來看看 gron 的實現原理。

所謂定時任務,其實包含兩個層面:

  • 觸發器。即我們希望這個任務在什麼時間點,什麼週期被觸發;
  • 任務。即我們在觸發之後,希望執行的任務,類比到我們上面範例的 fmt.Println。

對這兩個概念的封裝和擴充套件是一個定時任務庫必須考慮的。

而同時,我們是在 Golang 的協程上跑程式的,意味著這會是一個長期執行的協程,否則你即便指定了【一個月後幹XXX】這個任務,程式兩天後掛了,也就無法實現你的訴求了。

所以,我們還希望有一個 manager 的角色,來管理我們的一組【定時任務】,如何排程,什麼時候啟動,怎麼停止,啟動了以後還想加新任務是否支援。

Cron

在 gron 的體系裡,Cron 物件(我們上面通過 gron.New 建立出來的)就是我們的 manager,而底層的一個個【定時任務】則對應到 Cron 物件中的一個個 Entry:

// Cron provides a convenient interface for scheduling job such as to clean-up
// database entry every month.
//
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may also be started, stopped and the entries
// may be inspected.
type Cron struct {
	entries []*Entry
	running bool
	add     chan *Entry
	stop    chan struct{}
}

// New instantiates new Cron instant c.
func New() *Cron {
	return &Cron{
		stop: make(chan struct{}),
		add:  make(chan *Entry),
	}
}
  • entries 就是定時任務的核心能力,它記錄了一組【定時任務】;
  • running 用來標識這個 Cron 是否已經啟動;
  • add 是一個channel,用來支援在 Cron 啟動後,新增的【定時任務】;
  • stop 同樣是個channel,注意到是空結構體,用來控制 Cron 的停止。這個其實是經典寫法了,對日常開發也有借鑑意義,我們待會兒會好好看一下。

我們觀察到,當呼叫 gron.New() 方法後,得到的是一個指向 Cron 物件的指標。此時只是初始化了 stop 和 add 兩個 channel,沒有啟動排程。

Entry

重頭戲來了,Cron 裡面的 []*Entry 其實就代表了一組【定時任務】,每個【定時任務】可以簡化理解為 <觸發器,任務> 組成的一個 tuple。

// Entry consists of a schedule and the job to be executed on that schedule.
type Entry struct {
	Schedule Schedule
	Job      Job

	// the next time the job will run. This is zero time if Cron has not been
	// started or invalid schedule.
	Next time.Time

	// the last time the job was run. This is zero time if the job has not been
	// run.
	Prev time.Time
}

// Schedule is the interface that wraps the basic Next method.
//
// Next deduces next occurring time based on t and underlying states.
type Schedule interface {
	Next(t time.Time) time.Time
}

// Job is the interface that wraps the basic Run method.
//
// Run executes the underlying func.
type Job interface {
	Run()
}
  • Schedule 代表了一個【觸發器】,或者說一個定時策略。它只包含一個 Next 方法,接受一個時間點,業務要返回下一次觸發調動的時間點。
  • Job 則是對【任務】的抽象,只需要實現一個 Run 方法,沒有入參出參。

除了這兩個核心依賴外,Entry 結構還包含了【前一次執行時間點】和【下一次執行時間點】,這個目前可以忽略,只是為了輔助程式碼用。

按照時間排序

// byTime is a handy wrapper to chronologically sort entries.
type byTime []*Entry

func (b byTime) Len() int      { return len(b) }
func (b byTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }

// Less reports `earliest` time i should sort before j.
// zero time is not `earliest` time.
func (b byTime) Less(i, j int) bool {

	if b[i].Next.IsZero() {
		return false
	}
	if b[j].Next.IsZero() {
		return true
	}

	return b[i].Next.Before(b[j].Next)
}

這裡是對 Entry 列表的簡單封裝,因為我們可能同時有多個 Entry 需要排程,處理的順序很重要。這裡實現了 sort 的介面, 有了 Len()Swap()Less() 我們就可以用 sort.Sort() 來排序了。

此處的排序策略是按照時間大小。

新增定時任務

我們在範例裡面出現過呼叫 AddFunc() 來加入一個 gron.Every(xxx) 這樣一個【定時任務】。其實這是給使用者提供的簡單封裝。

// JobFunc is an adapter to allow the use of ordinary functions as gron.Job
// If f is a function with the appropriate signature, JobFunc(f) is a handler
// that calls f.
//
// todo: possibly func with params? maybe not needed.
type JobFunc func()

// Run calls j()
func (j JobFunc) Run() {
	j()
}


// AddFunc registers the Job function for the given Schedule.
func (c *Cron) AddFunc(s Schedule, j func()) {
	c.Add(s, JobFunc(j))
}

// Add appends schedule, job to entries.
//
// if cron instant is not running, adding to entries is trivial.
// otherwise, to prevent data-race, adds through channel.
func (c *Cron) Add(s Schedule, j Job) {

	entry := &Entry{
		Schedule: s,
		Job:      j,
	}

	if !c.running {
		c.entries = append(c.entries, entry)
		return
	}
	c.add <- entry
}

JobFunc 實現了我們上一節提到的 Job 介面,基於此,我們就可以讓使用者直接傳入一個 func() 就ok,內部轉成 JobFunc,再利用通用的 Add 方法將其加入到 Cron 中即可。

注意,這裡的 Add 方法就是新增定時任務的核心能力了,我們需要觸發器 Schedule,任務 Job。並以此來構造出一個定時任務 Entry。

若 Cron 範例還沒啟動,加入到 Cron 的 entries 列表裡就ok,隨後啟動的時候會處理。但如果已經啟動了,就直接往 add 這個 channel 中塞,走額外的新增排程路徑。

啟動和停止

// Start signals cron instant c to get up and running.
func (c *Cron) Start() {
	c.running = true
	go c.run()
}


// Stop halts cron instant c from running.
func (c *Cron) Stop() {

	if !c.running {
		return
	}
	c.running = false
	c.stop <- struct{}{}
}

我們先 high level 地看一下一個 Cron 的啟動和停止。

  • Start 方法執行的時候會先將 running 變數置為 true,用來標識範例已經啟動(啟動前後加入的定時任務 Entry 處理策略是不同的,所以這裡需要標識),然後啟動一個 goroutine 來實際跑啟動的邏輯。
  • Stop 方法則會將 running 置為 false,然後直接往 stop channel 塞一個空結構體即可。

ok,有了這個心裡預期,我們來看看 c.run() 裡面幹了什麼事:

var after = time.After


// run the scheduler...
//
// It needs to be private as it's responsible of synchronizing a critical
// shared state: `running`.
func (c *Cron) run() {

	var effective time.Time
	now := time.Now().Local()

	// to figure next trig time for entries, referenced from now
	for _, e := range c.entries {
		e.Next = e.Schedule.Next(now)
	}

	for {
		sort.Sort(byTime(c.entries))
		if len(c.entries) > 0 {
			effective = c.entries[0].Next
		} else {
			effective = now.AddDate(15, 0, 0) // to prevent phantom jobs.
		}

		select {
		case now = <-after(effective.Sub(now)):
			// entries with same time gets run.
			for _, entry := range c.entries {
				if entry.Next != effective {
					break
				}
				entry.Prev = now
				entry.Next = entry.Schedule.Next(now)
				go entry.Job.Run()
			}
		case e := <-c.add:
			e.Next = e.Schedule.Next(time.Now())
			c.entries = append(c.entries, e)
		case <-c.stop:
			return // terminate go-routine.
		}
	}
}

重點來了,看看我們是如何把上面 Cron, Entry, Schedule, Job 串起來的。

  • 首先拿到 local 的時間 now;
  • 遍歷所有 Entry,呼叫 Next 方法拿到各個【定時任務】下一次執行的時間點;
  • 對所有 Entry 按照時間排序(我們上面提過的 byTime);
  • 拿到第一個要到期的時間點,在 select 裡面通過 time.After 來監聽。到點了就起動新的 goroutine 跑對應 entry 裡的 Job,並回到 for 迴圈,繼續重新 sort,再走同樣的流程;
  • 若 add channel 裡有新的 Entry 被加進來,就加入到 Cron 的 entries 裡,觸發新的 sort;
  • 若 stop channel 收到了訊號,就直接 return,結束執行。

整體實現還是非常簡潔的,大家可以感受一下。

Schedule

前面其實我們暫時將觸發器的複雜性封裝在 Schedule 介面中了,但怎麼樣實現一個 Schedule 呢?

尤其是注意,我們還支援 At 操作,也就是指定 Day,和具體的小時,分鐘。回憶一下:

gron.Every(30 * xtime.Day).At("00:00")
gron.Every(1 * xtime.Week).At("23:59")

這一節我們就來看看,gron.Every 幹了什麼事,又是如何支援 At 方法的。

// Every returns a Schedule reoccurs every period p, p must be at least
// time.Second.
func Every(p time.Duration) AtSchedule {

	if p < time.Second {
		p = xtime.Second
	}

	p = p - time.Duration(p.Nanoseconds())%time.Second // truncates up to seconds

	return &periodicSchedule{
		period: p,
	}
}

gron 的 Every 函數接受一個 time.Duration,返回了一個 AtSchedule 介面。我待會兒會看,這裡注意,Every 裡面是會把【秒】級以下給截掉。

我們先來看下,最後返回的這個 periodicSchedule 是什麼:

type periodicSchedule struct {
	period time.Duration
}

// Next adds time t to underlying period, truncates up to unit of seconds.
func (ps periodicSchedule) Next(t time.Time) time.Time {
	return t.Truncate(time.Second).Add(ps.period)
}

// At returns a schedule which reoccurs every period p, at time t(hh:ss).
//
// Note: At panics when period p is less than xtime.Day, and error hh:ss format.
func (ps periodicSchedule) At(t string) Schedule {
	if ps.period < xtime.Day {
		panic("period must be at least in days")
	}

	// parse t naively
	h, m, err := parse(t)

	if err != nil {
		panic(err.Error())
	}

	return &atSchedule{
		period: ps.period,
		hh:     h,
		mm:     m,
	}
}

// parse naively tokenises hours and minutes.
//
// returns error when input format was incorrect.
func parse(hhmm string) (hh int, mm int, err error) {

	hh = int(hhmm[0]-'0')*10 + int(hhmm[1]-'0')
	mm = int(hhmm[3]-'0')*10 + int(hhmm[4]-'0')

	if hh < 0 || hh > 24 {
		hh, mm = 0, 0
		err = errors.New("invalid hh format")
	}
	if mm < 0 || mm > 59 {
		hh, mm = 0, 0
		err = errors.New("invalid mm format")
	}

	return
}

可以看到,所謂 periodicSchedule 就是一個【週期性觸發器】,只維護一個 time.Duration 作為【週期】。

periodicSchedule 實現 Next 的方式也很簡單,把秒以下的截掉之後,直接 Add(period),把週期加到當前的 time.Time 上,返回新的時間點。這個大家都能想到。

重點在於,對 At 能力的支援。我們來關注下 func (ps periodicSchedule) At(t string) Schedule 這個方法

  • 若週期連 1 天都不到,不支援 At 能力,因為 At 本質是在選定的一天內,指定小時,分鐘,作為輔助。連一天都不到的週期,是要精準處理的;
  • 將使用者輸入的形如 "23:59" 時間字串解析出來【小時】和【分鐘】;
  • 構建出一個 atSchedule 物件,包含了【週期時長】,【小時】,【分鐘】。

ok,這一步只是拿到了材料,那具體怎樣處理呢?這個還是得繼續往下走,看看 atSchedule 結構幹了什麼:

type atSchedule struct {
	period time.Duration
	hh     int
	mm     int
}

// reset returns new Date based on time instant t, and reconfigure its hh:ss
// according to atSchedule's hh:ss.
func (as atSchedule) reset(t time.Time) time.Time {
	return time.Date(t.Year(), t.Month(), t.Day(), as.hh, as.mm, 0, 0, time.UTC)
}

// Next returns **next** time.
// if t passed its supposed schedule: reset(t), returns reset(t) + period,
// else returns reset(t).
func (as atSchedule) Next(t time.Time) time.Time {
	next := as.reset(t)
	if t.After(next) {
		return next.Add(as.period)
	}
	return next
}

其實只看這個 Next 的實現即可。我們從 periodSchedule 那裡獲取了三個屬性。

在呼叫 Next 方法時,先做 reset,根據原有 time.Time 的年,月,日,以及使用者輸入的 At 中的小時,分鐘,來構建出來一個 time.Time 作為新的時間點。

此後判斷是在哪個週期,如果當前週期已經過了,那就按照下個週期的時間點返回。

到這裡,一切就都清楚了,如果我們不用 At 能力,直接 gron.Every(xxx),那麼直接就會呼叫

t.Truncate(time.Second).Add(ps.period)

拿到一個新的時間點返回。

而如果我們要用 At 能力,指定當天的小時,分鐘。那就會走到 periodicSchedule.At 這裡,解析出【小時】和【分鐘】,最後走 Next 返回 reset 之後的時間點。

這個和 gron.Every 方法返回的 AtSchedule 介面其實是完全對應的:

// AtSchedule extends Schedule by enabling periodic-interval & time-specific setup
type AtSchedule interface {
	At(t string) Schedule
	Schedule
}

直接就有一個 Schedule 可以用,但如果你想針對天級以上的 duration 指定時間,也可以走 At 方法,也會返回一個 Schedule 供我們使用。

擴充套件性

gron 裡面對於所有的依賴也都做成了【依賴介面而不是實現】。Cron 的 Add 函數的入參也是兩個介面,這裡可以隨意替換:func (c *Cron) Add(s Schedule, j Job)

最核心的兩個實體依賴 Schedule, Job 都可以用你自定義的實現來替換掉。

如實現一個新的 Job:

type Reminder struct {
	Msg string
}

func (r Reminder) Run() {
  fmt.Println(r.Msg)
}

事實上,我們上面提到的 periodicSchedule 以及 atSchedule 就是 Schedule 介面的具體實現。我們也完全可以不用 gron.Every,而是自己寫一套新的 Schedule 實現。只要實現 Next(p time.Duration) time.Time 即可。

我們來看一個完整用法案例:

package main

import (
	"fmt"
	"github.com/roylee0704/gron"
	"github.com/roylee0704/gron/xtime"
)
type PrintJob struct{ Msg string }
func (p PrintJob) Run() {
	fmt.Println(p.Msg)
}

func main() {

	var (
		// schedules
		daily     = gron.Every(1 * xtime.Day)
		weekly    = gron.Every(1 * xtime.Week)
		monthly   = gron.Every(30 * xtime.Day)
		yearly    = gron.Every(365 * xtime.Day)

		// contrived jobs
		purgeTask = func() { fmt.Println("purge aged records") }
		printFoo  = printJob{"Foo"}
		printBar  = printJob{"Bar"}
	)

	c := gron.New()

	c.Add(daily.At("12:30"), printFoo)
	c.AddFunc(weekly, func() { fmt.Println("Every week") })
	c.Start()

	// Jobs may also be added to a running Gron
	c.Add(monthly, printBar)
	c.AddFunc(yearly, purgeTask)

	// Stop Gron (running jobs are not halted).
	c.Stop()
}

經典寫法-控制退出

這裡我們還是要聊一下 Cron 裡控制退出的經典寫法。我們把其他不相關的部分清理掉,只留下核心程式碼:

type Cron struct {
	stop    chan struct{}
}

func (c *Cron) Stop() {
	c.stop <- struct{}{}
}

func (c *Cron) run() {

	for {
		select {
		case <-c.stop:
			return // terminate go-routine.
		}
	}
}

空結構體能夠最大限度節省記憶體,畢竟我們只是需要一個訊號。核心邏輯用 for + select 的配合,這樣當我們需要結束時可以立刻響應。非常經典,建議大家日常有需要的時候採用。

結語

gron 整體程式碼其實只在 cron.go 和 schedule.go 兩個檔案,合起來程式碼不過 300 行,非常精巧,基本沒有冗餘,擴充套件性很好,是非常好的入門材料。

不過,作為一個 cron 的替代品,其實 gron 還是有自己的問題的。簡單講就是,如果我重啟了一個EC2範例,那麼我的 cron job 其實也還會繼續執行,這是落盤的,作業系統級別的支援。

但如果我執行 gron 的程序掛掉了,不好意思,那就完全涼了。你只有重啟,然後再把所有任務加回來才行。而我們既然要用 gron,是很有可能定一個幾天後,幾個星期後,幾個月後這樣的觸發器的。誰能保證程序一直活著呢?連機子本身都可能重啟。

所以,我們需要一定的機制來保證 gron 任務的可恢復性,將任務落盤,持久化狀態資訊,算是個思考題,這裡大家可以考慮一下怎麼做。

到此這篇關於一文詳解Golang 定時任務庫 gron 設計和原理的文章就介紹到這了,更多相關Golang   gron內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com