首頁 > 軟體

Go http請求排隊處理實戰範例

2022-07-18 14:03:17

一、http請求的順序處理方式

在高並行場景下,為了降低系統壓力,都會使用一種讓請求排隊處理的機制。本文就介紹在Go中是如何實現的。

首先,我們看下正常的請求處理邏輯。 使用者端傳送請求,web server接收請求,然後就是處理請求,最後響應給使用者端這樣一個順序的邏輯。如下圖所示:

程式碼實現如下:

package main
import (
	"fmt"
	"net/http"
)
func main() {
	myHandler := MyHandler{}
	http.Handle("/", &myHandler)
	http.ListenAndServe(":8080", nil)
}
type MyHandler struct {
}
func (h *MyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("Hello Go"))
}

在瀏覽器中輸入 http://localhost:8080/,就能在頁面上顯示出“Hello Go”的頁面來。

通常情況下,大家在開發web系統的時候,一般都是這麼處理請求。接下來我們看在高並行下如何實現讓請求進行排隊處理。

二、http請求的非同步處理方式--排隊處理

讓http請求進入到佇列,我們也稱為非同步處理方式。其基本思想就是將接收到的請求的上下文(即request和response)以及處理邏輯包裝成一個工作單元,然後將其放到佇列,然後該工作單元等待消費的工作執行緒處理該job,處理完成後再返回給使用者端。 流程如下圖:

該實現中會有三個關鍵的元素:工作執行單元、佇列、消費者。下面我們逐一看下各自的職責及實現。

工作單元

該工作單元主要是封裝請求的上下文資訊(request和response)、請求的處理邏輯以及該工作單元是否被執行完成的狀態。

請求的處理邏輯實際上就是原來在順序處理流程中的具體函數,如果是mvc模式的話就是controller裡的一個具體的action。

在Go中實現通訊的方式一般是使用通道。所以,在工作單元中有一個通道,當該工作單元執行完具體的處理邏輯後,就往該通道中寫入一個訊息,以通知主協程該次請求已完成,可以返回給使用者端了。

所以,一個http請求的處理邏輯看起來就像是下面這樣:

func (h *MyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  將w和r包裝成工作單元job
  將job入隊
  等待job執行完成
  本次請求處理完畢
}

下面我們看下工作單元的具體實現,這裡我們將其定義為一個Job結構體:

type Job struct {
    DoneChan  chan struct{}
    handleJob func(j FlowJob) error //具體的處理邏輯
}

Job結構體中有一個handleJob,其型別是一個函數,即處理請求的邏輯部分。DoneChan通道用來讓該單元進行阻塞等待,並當handleJob執行完畢後傳送訊息通知的。

下面我們再看看該Job的相關行為:

// 消費者從佇列中取出該job時 執行具體的處理邏輯
func (job *Job) Execute() error {
    fmt.Println("job start to execute ")
    return job.handleJob(job)
}
// 執行完Execute後,呼叫該函數以通知主執行緒中等待的job
func (job *Job) Done() {
    job.DoneChan <- struct{}{}
    close(job.DoneChan)
}
// 工作單元等待自己被消費
func (job *Job) WaitDone() {
    select {
    case <-job.DoneChan:
	return
    }
}

佇列

佇列主要是用來儲存工作單元的。是處理請求的主協程和消費協程之間的紐帶。佇列具有列表、容量、當前元素個數等關鍵元素組成。如下:

type JobQueue struct {
    mu         sync.Mutex
    noticeChan chan struct{}
    queue      *list.List
    size       int
    capacity   int
}

其行為主要有入隊、出隊、移除等操作。定義如下:

// 初始化佇列
func NewJobQueue(cap int) *JobQueue {
    return &JobQueue{
	capacity: cap,
	queue:    list.New(),
	noticeChan: make(chan struct{}, 1),
    }
}
// 工作單元入隊
func (q *JobQueue) PushJob(job *Job) {
    q.mu.Lock()
    defer q.mu.Unlock()
    q.size++
    if q.size > q.capacity {
	q.RemoveLeastJob()
    }
    q.queue.PushBack(job)
    q.noticeChan <- struct{}{}
}
// 工作單元出隊
func (q *JobQueue) PopJob() *Job {
	q.mu.Lock()
	defer q.mu.Unlock()
	if q.size == 0 {
		return nil
	}
	q.size--
	return q.queue.Remove(q.queue.Front()).(*Job)
}
// 移除佇列中的最後一個元素。
// 一般在容量滿時,有新job加入時,會移除等待最久的一個job
func (q *JobQueue) RemoveLeastJob() {
	if q.queue.Len() != 0 {
		back := q.queue.Back()
		abandonJob := back.Value.(*Job)
		abandonJob.Done()
		q.queue.Remove(back)
	}
}
// 消費執行緒監聽佇列的該通道,檢視是否有新的job需要消費
func (q *JobQueue) waitJob() <-chan struct{} {
    return q.noticeChan
}

這裡我們主要解釋一下入隊的操作流程:

  • 1 首先是佇列的元素個數size++
  • 2 判斷size是否超過最大容量capacity
  • 3 若超過最大容量,則將佇列中最後一個元素移除。因為該元素等待時間最長,認為是超時的情況。
  • 4 將新接收的工作單元放入到隊尾。
  • 5 往noticeChan通道中寫入一個訊息,以便通知消費協程處理Job。

由以上可知,noticeChan是佇列和消費者協程之間的紐帶。下面我們來看看消費者的實現。

消費者協程

消費者協程的職責是監聽佇列,並從佇列中獲取工作單元,執行工作單元的具體處理邏輯。在實際應用中,可以根據系統的承載能力啟用多個消費協程。在本文中,為了方便講解,我們只啟用一個消費協程。

我們定義一個WorkerManager結構體,負責管理具體的消費協程。該WorkerManager有一個屬性是工作佇列,所有啟動的消費協程都需要從該工作佇列中獲取工作單元。程式碼實現如下:

type WorkerManager struct {
    jobQueue *JobQueue
}
func NewWorkerManager(jobQueue *JobQueue) *WorkerManager {
    return &WorkerManager{
	jobQueue: jobQueue,
    }
}
func (m *WorkerManager) createWorker() error {
    go func() {
	fmt.Println("start the worker success")
	var job FlowJob
	for {
            select {
                case <-m.jobQueue.waitJob():
		fmt.Println("get a job from job queue")
                job = m.jobQueue.PopJob()
		fmt.Println("start to execute job")
		job.Execute()
                fmt.Print("execute job done")
		job.Done()
            }
	}
    }()
    return nil
}

在程式碼中我們可以看到,createWorker中的邏輯實際是一個for迴圈,然後通過select監聽佇列的noticeChan通道,當獲取到工作單元時,就執行工作單元中的handleJob方法。執行完後,通過job.Done()方法通知在主協程中還等待的job。這樣整個流程就形成了閉環。

完整程式碼

我們現在看下整體的處理流程,如下圖:

現在我們寫一個測試demo。在這裡我們定義了一個全域性的flowControl結構體,以作為佇列和工作協程的管理。程式碼如下:

package main
import (
    "container/list"
    "fmt"
    "net/http"
    "sync"
)
func main() {
    flowControl := NewFlowControl()
    myHandler := MyHandler{
	flowControl: flowControl,
    }
    http.Handle("/", &myHandler)
    http.ListenAndServe(":8080", nil)
}
type MyHandler struct {
    flowControl *FlowControl
}
func (h *MyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	fmt.Println("recieve http request")
	job := &Job{
            DoneChan: make(chan struct{}, 1),
            handleJob: func(job *Job) error {
		w.Header().Set("Content-Type", "application/json")
		w.Write([]byte("Hello World"))
		return nil
            },
	}
	h.flowControl.CommitJob(job)
	fmt.Println("commit job to job queue success")
	job.WaitDone()
}
type FlowControl struct {
    jobQueue *JobQueue
    wm       *WorkerManager
}
func NewFlowControl() *FlowControl {
    jobQueue := NewJobQueue(10)
    fmt.Println("init job queue success")
    m := NewWorkerManager(jobQueue)
    m.createWorker()
    fmt.Println("init worker success")
    control := &FlowControl{
	jobQueue: jobQueue,
	wm:       m,
    }
    fmt.Println("init flowcontrol success")
    return control
}
func (c *FlowControl) CommitJob(job *Job) {
    c.jobQueue.PushJob(job)
    fmt.Println("commit job success")
}

之前有一篇文章是優先順序佇列,實際上就是該佇列的高階實現版本,可以將不同的請求按優先順序分配到不同的佇列中。有興趣的同學可參考:Go實戰 單佇列到優先順序佇列的實現

總結

通過將請求的上下文資訊封裝到一個工作單元中,並將其放入到佇列中,然後通過訊息通道的方式阻塞等待消費者執行完畢。同時在佇列中通過設定佇列的容量以解決請求過多而給系統造成壓力的問題。

以上就是Go http請求排隊處理實戰的詳細內容,更多關於Go http請求排隊的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com