首頁 > 軟體

Go 實戰單佇列到優先順序佇列實現圖文範例

2022-07-18 14:02:19

優先順序佇列概述

佇列,是資料結構中實現先進先出策略的一種資料結構。而優先佇列則是帶有優先順序的佇列,即先按優先順序分類,然後相同優先順序的再 進行排隊。優先順序高的佇列中的元素會優先被消費。如下圖所示:

在Go中,可以定義一個切片,切片的每個元素代表一種優先順序佇列,切片的索引順序代表優先順序順序,後面程式碼實現部分我們會詳細講解。

為什麼需要優先順序佇列

先來看現實生活中的例子。銀行的辦事視窗,有普通視窗和vip視窗,vip視窗因為排隊人數少,等待的時間就短,比普通視窗就會優先處理。同樣,在登機口,就有貴賓通道和普通,同樣貴賓通道優先登機。

在網際網路中,當然就是請求和響應。使用優先順序佇列的作用是將請求按特定的屬性劃分出優先順序,然後按優先順序的高低進行優先處理。在研發服務的時候這裡有個隱含的約束條件就是伺服器資源(CPU、記憶體、頻寬等)是有限的。如果伺服器資源是無限的,那麼也就不需要佇列進行排隊了,來一個請求就立即處理一個請求就好了。所以,為了在最大限度的利用伺服器資源的前提下,將更重要的任務(優先順序高的請求)優先處理,以更好的服務使用者。

對於請求優先順序的劃分可以根據業務的特點根據價值高的優先原則來進行劃分即可。例如可以根據是否是否是會員、是否是VIP會員等屬性進行劃分優先順序。也可以根據是否是付費使用者進行劃分。在部落格的業務中,也可以根據是否是大V的屬性進行優先順序劃分。在網際網路廣告業務中,可以根據廣告位資源價值高低來劃分優先順序。

優先順序佇列實現原理

01 四個角色

在完整的優先順序佇列中有四個角色,分別是優先順序佇列、工作單元、消費者worker、通知channel

工作單元Job:佇列裡的元素。我們把每一次業務處理都封裝成一個工作單元,該工作單元會進入對應的優先順序佇列進行排隊,然後等待消費者worker來消費執行。優先順序佇列:按優先順序劃分的佇列,用來暫存對應優先順序的工作單元Job,相同優先順序的工作單元會在同一個佇列裡。noticeChan通道:當有工作單元進入優先順序佇列排隊後,會在通道里傳送一個訊息,以通知消費者worker從佇列中獲取元素(工作單元)進行消費。消費者worker:監聽noticeChan,當監聽到noticeChan有訊息時,說明佇列中有工作單元需要被處理,優先從高優先順序佇列中獲取元素進行消費。

02 佇列-消費者模式

根據佇列個數和消費者個數,我們可以將佇列-消費者模式分為單佇列-單消費者模式多佇列(優先順序佇列)- 單消費者模式多佇列(優先順序佇列)- 多消費者模式

我們先從最簡單的單佇列-單消費者模式實現,然後一步步演化成多佇列(優先順序佇列)-多消費者模式。

03 單佇列-單消費者模式實現

3.1 佇列的實現

我們先來看下佇列的實現。這裡我們用Golang中的List資料結果來實現,List資料結構是一個雙向連結串列,包含了將元素放到連結串列尾部、將頭部元素彈出的操作,符合佇列先進先出的特性。

好,我們看下具體的佇列的資料結構:

type JobQueue struct {
    mu sync.Mutex //佇列的操作需要並行安全
    jobList *list.List //List是golang庫的雙向佇列實現,每個元素都是一個job
    noticeChan chan struct{} //入隊一個job就往該channel中放入一個訊息,以供消費者消費
}

入隊操作

/**
 * 佇列的Push操作
 */
func (queue *JobQueue) PushJob(job Job) {
    queue.jobList.PushBack(job) //將job加到隊尾
    queue.noticeChan <- struct{}{}
}

到這裡有同學就會問了,為什麼不直接將job推播到Channel中,然後讓消費者依次消費不就行了麼?是的,單佇列這樣是可以的,因為我們最終目標是為了實現優先順序的多佇列,所以這裡即使是單佇列,我們也使用List資料結構,以便後續的演變

還有一點,大家注意到了,這裡入隊操作時有一個 這樣的操作:

queue.noticeChan <- struct{}{}

消費者監聽的實際上不是佇列本身,而是通道noticeChan。當有一個元素入隊時,就往noticeChan通道中輸入一條訊息,這裡是一個空結構體,主要作用就是通知消費者worker,佇列裡有要處理的元素了,可以從佇列中獲取了。 這個在後面演化成多佇列以及多消費者模式時會很有用。

出隊操作

根據佇列的先進先出原則,是要獲取佇列的最先進入的元素。Golang中List結構體的Front()函數是獲取連結串列的第一個元素,然後通過Remove函數將該元素從連結串列中移出,即得到了佇列中的第一個元素。這裡的Job結構體先不用關心,我們後面實現工作單元Job時,會詳細講解。

/**
 * 彈出佇列的第一個元素
 */
func (queue *JobQueue) PopJob() Job {
    queue.mu.Lock()
    defer queue.mu.Unlock()
    /**
     * 說明在佇列中沒有元素了
     */
    if queue.jobList.Len() == 0 {
        return nil
    }
    elements := queue.jobList.Front() //獲取隊裡的第一個元素
    return queue.jobList.Remove(elements).(Job) //將元素從佇列中移除並返回
}

等待通知操作

上面我們提到,消費者監聽的是noticeChan通道。當有元素入隊時,會往noticeChan中輸入一條訊息,以便通知消費者進行消費。如果佇列中沒有要消費的元素,那麼消費者就會阻塞在該通道上。

func (queue *JobQueue) WaitJob() <-chan struct{} {
    return queue.noticeChan
}

3.2 工作單元--Job的實現

一個工作單元就是一個要執行的任務。在系統中往往需要執行不同的任務,就是需要有不同型別的工作單元,但這些工作單元都有一組共同的執行流程。我們看下工作單元的類圖。

圖-job類圖

我們看下類圖中的幾個角色:

  • Job介面:定義了所有Job要實現的方法。
  • BaseJob類(結構體):定義了具體Job的基礎類別。因為具體Job類中的有共同的屬性和方法。所以抽象出一個基礎類別,避免重複實現。但該基礎類別對Execute方法沒有實現,因為不同的工作單元有具體的執行邏輯。
  • SquareJob和AreaJob類(結構體):是我們要具體實現的業務工作Job。主要是實現Execute的具體執行邏輯。根據業務的需要定義自己的工作Job和對應的Execute方法即可。

接下來,我們以計算一個int型別數位的平方的SquareJob為例來看下具體的實現。

  • BaseJob結構體

首先看下該結構體的定義

type BaseJob struct {
    Err error
    DoneChan chan struct{} //當作業完成時,或者作業被取消時,通知呼叫者
    Ctx context.Context
    cancelFunc context.CancelFunc
}

在該結構體中,我們主要關注DoneChan欄位就行,該欄位是當具體的Job的Execute執行完成後,來通知呼叫者的。

再來看Done函數,該函數就是在Execute函數完成後,要關閉DoneChan通道,以解除Job的阻塞而繼續執行其他邏輯。

/**
 * 作業執行完畢,關閉DoneChan,所有監聽DoneChan的接收者都能收到關閉的訊號
 */
func (job *BaseJob) Done() {
    close(job.DoneChan)
}

再來看WaitDone函數,該函數是當Job執行後,要等待Job執行完成,在未完成之前,DoneChan裡沒有訊息,通過該函數就能將job阻塞,直到Execute中呼叫了Done(),以便解除阻塞。

/**
 * 等待job執行完成
 */
func (job *BaseJob) WaitDone()  {
    select {
    case <-job.DoneChan:
        return
    }
}

SquareJob結構體

type SquareJob struct {
    *BaseJob
    x int
}

從結構體的定義中可知,SquareJob巢狀了BaseJob,所以該結構體擁有BaseJob的所有欄位和方法。在該結構體主要實現了Execute的邏輯:對x求平方。

func (s *SquareJob) Execute() error {
    result := s.x * s.x
    fmt.Println("the result is ", result)
    return nil
}

3.3 消費者Worker的實現

Worker主要功能是通過監聽佇列裡的noticeChan是否有需要處理的元素,如果有元素的話從佇列裡獲取到要處理的元素job,然後執行job的Execute方法。

我們將該結構體定位為WorkerManager,因為在後面我們講解多Worker模式時,會需要一個Worker的管理者,因此定義成了WorkerManager。

type WorkerManager struct {
    queue *JobQueue
    closeChan chan struct{}
}

StartWorker函數,只有一個for迴圈,不斷的從佇列中獲取Job。獲取到Job後,進行消費Job,即ConsumeJob。

 func (m *WorkerManager) StartWork() error {
    fmt.Println("Start to Work")
    for {
        select {
            case <-m.closeChan:
                return nil
            case <-m.queue.noticeChan:
                job := m.queue.PopJob()
                m.ConsumeJob(job)
        }
    }
    return nil
}
func (m *WorkerManager) ConsumeJob(job Job) {
    defer func() {
        job.Done()
    }()
    job.Execute()
}

到這裡,單佇列-單消費者模式中各角色的實現就講解完了。我們通過main函數將其關聯起來。

func main() {
    //初始化一個佇列
    queue := &JobQueue{
        jobList: list.New(),
        noticeChan: make(chan struct{}, 10),
    }
    //初始化一個消費worker
    workerManger := NewWorkerManager(queue)
    // worker開始監聽佇列
    go workerManger.StartWork()
    // 構造SquareJob
    job := &SquareJob{
        BaseJob: &BaseJob{
            DoneChan: make(chan struct{}, 1),
        },
        x: 5,
    }
    //壓入佇列尾部
    queue.PushJob(job)
    //等待job執行完成
    job.WaitDone()
    print("The End")
}

04 多佇列-單消費者模式

有了單佇列-單消費者的基礎,我們如何實現多佇列-單消費者模式。也就是優先順序佇列。

優先順序的佇列,實質上就是根據工作單元Job的優先順序屬性,將其放到對應的優先順序佇列中,以便worker可以根據優先順序進行消費。我們要在Job結構體中增加一個Priority屬性。因為該屬性是所有Job都共有的,因此定義在BaseJob上更合適.

type BaseJob struct {
    Err error
    DoneChan chan struct{} //當作業完成時,或者作業被取消時,通知呼叫者
    Ctx context.Context
    cancelFunc context.CancelFunc
    priority int //工作單元的優先順序
}

我們再來看看多佇列如何實現。實際上就是用一個切片來儲存各個佇列,切片的每個元素儲存一個JobQueue佇列元素即可。

var queues = make([]*JobQueue, 10, 100)

那各優先順序的佇列在切片中是如何儲存的呢?切片索引順序只代表優先順序的高於低,不代表具體是哪個優先順序。

什麼意思呢?假設我們現在對目前的工作單元定義了1、4、7三個優先順序。這3個優先順序在切片中是按優先順序從小到到依次儲存在queues切片中的,如下圖:

圖-正確的切片儲存的優先順序

那為什麼不讓切片的索引就代表優先順序,讓優先順序為1的佇列儲存在索引1處,優先順序4的佇列儲存在索引4處,優先順序7的佇列儲存在索引7處呢?如果這樣儲存的話,就會變成如下這樣:

圖4-直接使用索引作為優先順序缺點

可見如果我們設定的優先順序不是連續的,那麼就會造成空間的浪費。所以,我們是將佇列按優先順序高低依次存放到了切片中。

那既然這樣,當一個優先順序的job來了之後,我該怎麼知道該優先順序的佇列是儲存在哪個索引中呢?我們用一個map來對映優先順序和切片索引之間的關係。這樣當一個工作單元Job入隊的時候,以優先順序為key,就可以查詢到對應優先順序的佇列儲存在切片的哪個位置了。如下圖所示:

圖-優先順序和索引對映

程式碼定義:

var priorityIdx map[int][int]//該map的key是優先順序,value代表的是queues切片的索引

好了,我們重新定義一下佇列的結構體:

type PriorityQueue struct {
    mu sync.Mutex
    noticeChan chan struct{}
    queues []*JobQueue
    priorityIdx map[int]int
}
//原來的JobQueue會變成如下這樣:
type JobQueue struct {
    priority int //代表該佇列是哪種優先順序的佇列
    jobList *list.List //List是golang庫的雙向佇列實現,每個元素都是一個job
}

這裡我們注意到有以下幾個變化:

JobQueue裡多了一個Priority屬性,代表該佇列是哪個優先順序別。noticeChan屬性從JobQueue中移動到了PriorityQueue中。因為現在有多個佇列,只要任意一個佇列裡有元素就需要通知消費者worker進行消費,因此消費者worker監聽的是PriorityQueue中是否有元素,而在監聽階段不關心具體哪個優先順序佇列中有元素。

好了,資料結構定義完了,我們看看將工作單元Job推入佇列和從佇列中彈出Job又有什麼變化。

優先順序佇列的入隊操作

優先順序佇列的入隊操作,就需要根據入隊Job的優先順序屬性放到對應的優先順序佇列中,入隊流程圖如下:

圖-優先順序佇列入隊流程

當一個Job加入佇列的時候,有兩種場景,一種是該優先順序的佇列已經存在,則直接Push到隊尾即可。一種是該優先順序的佇列還不存在,則需要先建立該優先順序的佇列,然後再將該工作單元Push到隊尾。如下是兩種場景。

佇列已經存在的場景

這種場景會比較簡單。假設我們要插入優先順序為7的工作單元,首先從對映表中查詢7是否存在,發現對應關係是2,則直接找到切片中索引2的元素,即優先順序為7的佇列,將job加入即可。如下圖。

圖-已存在佇列插入

佇列不存在的場景

這種場景稍微複雜些,在對映表中找不到要插入優先順序的佇列的話,則需要在切片中插入一個優先順序佇列,而為了優先順序佇列在切片中也保持有序(保持有序就可以知道佇列的優先順序的高低了),則需要移動相關的元素。我們以插入優先順序為6的工作單元為例來講解。

1、首先,我們的佇列有一個初始化的狀態,儲存了優先順序1、4、7的佇列。如下圖。

2、當插入優先順序為6的工作單元時,發現在對映表中沒有優先順序6的對映關係,說明在切片中還沒有優先順序為6的佇列的元素。所以需要在切片中依次查詢到優先順序6應該插入的位置在4和7之間,也就是需要儲存在切片2的位置。

3、將原來索引2位置的優先順序為7的佇列往後移動到3,同時更新對映表中的對應關係。

4、將優先順序為6的工作單元插入到索引2的佇列中,同時更新對映表中的優先順序和索引的關係。

我們看下程式碼實現:

func (priorityQueue *PriorityQueue) Push(job Job) {
    priorityQueue.mu.Lock()
    defer priorityQueue.mu.Unlock()
    //先根據job的優先順序找要入隊的佇列
    var idx int
    var ok bool
    //從優先順序-切片索引的map中查詢該優先順序的佇列是否存在
    if idx, ok = priorityQueue.priorityIdx[job.Priority()]; !ok {
        //如果不存在該優先順序的佇列,則需要初始化一個佇列,並返回該佇列在切片中的索引位置
        idx = priorityQueue.addPriorityQueue(job.Priority)
    }
    //根據獲取到的切片索引idx,找到具體的佇列
    queue := priority.queues[idx]
    //將job推播到佇列的隊尾
    queue.JobList.PushBack(job)
    //佇列job個數+1
    priorityQueue.Size++
    //如果佇列job個數超過佇列的最大容量,則從優先順序最低的佇列中移除工作單元
    if priorityQueue.size > priorityQueue.capacity {
        priorityQueue.RemoveLeastPriorityJob()
    }else {
        //通知新進來一個job
        priorityQueue.noticeChan <- struct{}{}
    }
}

程式碼中大部分也都做了註釋,不難理解。這裡我們來看下addPriorityQueue的具體實現:

func (priorityQueue *PriorityQueue) addPriorityQueue(priority int) int {
    n := len(priorityQueue.queues)
    //通過二分查詢找到priority應插入的切片索引
    pos := sort.Search(n, func(i int) bool {
        return priority < priorityQueue.priority
    })
    //更新對映表中優先順序和切片索引的對應關係
    for i := pos; i < n; i++ {
        priorityQueue.priorityIdx[priorityQueue.queues[i].priority] = i + 1
    }
    tail := make([]*jobQueue, n-pos)
    copy(tail, priorityQueue.queues[pos:])
    //初始化一個新的優先順序佇列,並將該元素放到切片的pos位置中
    priorityQueue.queues = append(priorityQueue.queues[0:pos], newJobQueue(priority))
    //將高於priority優先順序的元素也拼接到切片後面
    priorityQueue.queues = append(priorityQueue.queues, tail...) 
    return pos
}

最後,我們再來看一個實際的呼叫例子:

func main() {
    //初始化一個佇列
    queue := &PriorityQueue{
        noticeChan: make(chan struct{}, cap),
        capacity: cap,
        priorityIdx: make(map[int]int),
        size: 0,
    }
    //初始化一個消費worker
    workerManger := NewWorkerManager(queue)
    // worker開始監聽佇列
    go workerManger.StartWork()
    // 構造SquareJob
    job := &SquareJob{
        BaseJob: &BaseJob{
            DoneChan: make(chan struct{}, 1),
        },
        x: 5,
        priority: 10,
    }
    //壓入佇列尾部
    queue.PushJob(job)
    //等待job執行完成
    job.WaitDone()
    print("The End")
}

05 多佇列-多消費者模式

我們在多佇列-單消費者的基礎上,再來看看多消費者模式。也就是增加worker的數量,提高Job的處理速度。

我們再來看下worker的定義:

type WorkerManager struct {
    queue *PriorityQueue
    closeChans []chan struct{}
}

這裡需要注意,closeChans變成了切片陣列。因為我們每啟動一個worker,就需要有一個關閉通道。

然後看StartWorker函數的實現:

 func (m *WorkerManager) StartWork(n int) error {
    fmt.Println("Start to Work")
    for i := 0; i < n; i++ {
        m.createWorker();
    }
    return nil
}
func (m *WorkerManager) createWorker() {
    closeChan := make(chan struct{})
    //每個協程,就是一個worker
    go func(closeChan chan struct{}) {
        var job Job
        for {
                select {
                    case <-m.closeChan:
                        return nil
                    case <-m.queue.noticeChan:
                        job := m.queue.PopJob()
                        m.ConsumeJob(job)
                }   
        }
    }(closeChan)
    m.closeChanMu.Lock()
    defer m.closeChanMu.Unlock()
    m.closeChans = append(m.closeChans, closeChan)
    return nil
}
func (m *WorkerManager) ConsumeJob(job Job) {
    defer func() {
        job.Done()
    }()
    job.Execute()
}

這裡需要注意的是,所有的worker都需要監聽佇列的noticeChan通道。測試的例子就留給讀者自己了。

另外如下圖的單佇列-多消費者模式是多佇列-多消費者模式的一個特例,這裡就不再進行實現了。

總結

佇列的作用可以用來控制流量,而優先順序佇列在兼顧流量控制的同時,還能將流量按優先順序高低來進行處理。 本文中一些細節的並行加鎖操作做了忽略,大家在實際應用中根據需要進行完善即可,更多關於Go 單佇列優先順序佇列的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com