<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在高並行場景下,為了降低系統壓力,都會使用一種讓請求排隊處理的機制。本文就介紹在Go中是如何實現的。
首先,我們看下正常的請求處理邏輯。 使用者端傳送請求,web server接收請求,然後就是處理請求,最後響應給使用者端這樣一個順序的邏輯。如下圖所示:
程式碼實現如下:
package main import ( "fmt" "net/http" ) func main() { myHandler := MyHandler{} http.Handle("/", &myHandler) http.ListenAndServe(":8080", nil) } type MyHandler struct { } func (h *MyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Hello Go")) }
在瀏覽器中輸入 http://localhost:8080/,就能在頁面上顯示出“Hello Go”的頁面來。
通常情況下,大家在開發web系統的時候,一般都是這麼處理請求。接下來我們看在高並行下如何實現讓請求進行排隊處理。
讓http請求進入到佇列,我們也稱為非同步處理方式。其基本思想就是將接收到的請求的上下文(即request和response)以及處理邏輯包裝成一個工作單元,然後將其放到佇列,然後該工作單元等待消費的工作執行緒處理該job,處理完成後再返回給使用者端。 流程如下圖:
該實現中會有三個關鍵的元素:工作執行單元、佇列、消費者。下面我們逐一看下各自的職責及實現。
該工作單元主要是封裝請求的上下文資訊(request和response)、請求的處理邏輯以及該工作單元是否被執行完成的狀態。
請求的處理邏輯實際上就是原來在順序處理流程中的具體函數,如果是mvc模式的話就是controller裡的一個具體的action。
在Go中實現通訊的方式一般是使用通道。所以,在工作單元中有一個通道,當該工作單元執行完具體的處理邏輯後,就往該通道中寫入一個訊息,以通知主協程該次請求已完成,可以返回給使用者端了。
所以,一個http請求的處理邏輯看起來就像是下面這樣:
func (h *MyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
將w和r包裝成工作單元job
將job入隊
等待job執行完成
本次請求處理完畢
}
下面我們看下工作單元的具體實現,這裡我們將其定義為一個Job結構體:
type Job struct { DoneChan chan struct{} handleJob func(j FlowJob) error //具體的處理邏輯 }
Job結構體中有一個handleJob,其型別是一個函數,即處理請求的邏輯部分。DoneChan通道用來讓該單元進行阻塞等待,並當handleJob執行完畢後傳送訊息通知的。
下面我們再看看該Job的相關行為:
// 消費者從佇列中取出該job時 執行具體的處理邏輯 func (job *Job) Execute() error { fmt.Println("job start to execute ") return job.handleJob(job) } // 執行完Execute後,呼叫該函數以通知主執行緒中等待的job func (job *Job) Done() { job.DoneChan <- struct{}{} close(job.DoneChan) } // 工作單元等待自己被消費 func (job *Job) WaitDone() { select { case <-job.DoneChan: return } }
佇列主要是用來儲存工作單元的。是處理請求的主協程和消費協程之間的紐帶。佇列具有列表、容量、當前元素個數等關鍵元素組成。如下:
type JobQueue struct { mu sync.Mutex noticeChan chan struct{} queue *list.List size int capacity int }
其行為主要有入隊、出隊、移除等操作。定義如下:
// 初始化佇列 func NewJobQueue(cap int) *JobQueue { return &JobQueue{ capacity: cap, queue: list.New(), noticeChan: make(chan struct{}, 1), } } // 工作單元入隊 func (q *JobQueue) PushJob(job *Job) { q.mu.Lock() defer q.mu.Unlock() q.size++ if q.size > q.capacity { q.RemoveLeastJob() } q.queue.PushBack(job) q.noticeChan <- struct{}{} } // 工作單元出隊 func (q *JobQueue) PopJob() *Job { q.mu.Lock() defer q.mu.Unlock() if q.size == 0 { return nil } q.size-- return q.queue.Remove(q.queue.Front()).(*Job) } // 移除佇列中的最後一個元素。 // 一般在容量滿時,有新job加入時,會移除等待最久的一個job func (q *JobQueue) RemoveLeastJob() { if q.queue.Len() != 0 { back := q.queue.Back() abandonJob := back.Value.(*Job) abandonJob.Done() q.queue.Remove(back) } } // 消費執行緒監聽佇列的該通道,檢視是否有新的job需要消費 func (q *JobQueue) waitJob() <-chan struct{} { return q.noticeChan }
這裡我們主要解釋一下入隊的操作流程:
由以上可知,noticeChan是佇列和消費者協程之間的紐帶。下面我們來看看消費者的實現。
消費者協程的職責是監聽佇列,並從佇列中獲取工作單元,執行工作單元的具體處理邏輯。在實際應用中,可以根據系統的承載能力啟用多個消費協程。在本文中,為了方便講解,我們只啟用一個消費協程。
我們定義一個WorkerManager結構體,負責管理具體的消費協程。該WorkerManager有一個屬性是工作佇列,所有啟動的消費協程都需要從該工作佇列中獲取工作單元。程式碼實現如下:
type WorkerManager struct { jobQueue *JobQueue } func NewWorkerManager(jobQueue *JobQueue) *WorkerManager { return &WorkerManager{ jobQueue: jobQueue, } } func (m *WorkerManager) createWorker() error { go func() { fmt.Println("start the worker success") var job FlowJob for { select { case <-m.jobQueue.waitJob(): fmt.Println("get a job from job queue") job = m.jobQueue.PopJob() fmt.Println("start to execute job") job.Execute() fmt.Print("execute job done") job.Done() } } }() return nil }
在程式碼中我們可以看到,createWorker中的邏輯實際是一個for迴圈,然後通過select監聽佇列的noticeChan通道,當獲取到工作單元時,就執行工作單元中的handleJob方法。執行完後,通過job.Done()方法通知在主協程中還等待的job。這樣整個流程就形成了閉環。
我們現在看下整體的處理流程,如下圖:
現在我們寫一個測試demo。在這裡我們定義了一個全域性的flowControl結構體,以作為佇列和工作協程的管理。程式碼如下:
package main import ( "container/list" "fmt" "net/http" "sync" ) func main() { flowControl := NewFlowControl() myHandler := MyHandler{ flowControl: flowControl, } http.Handle("/", &myHandler) http.ListenAndServe(":8080", nil) } type MyHandler struct { flowControl *FlowControl } func (h *MyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Println("recieve http request") job := &Job{ DoneChan: make(chan struct{}, 1), handleJob: func(job *Job) error { w.Header().Set("Content-Type", "application/json") w.Write([]byte("Hello World")) return nil }, } h.flowControl.CommitJob(job) fmt.Println("commit job to job queue success") job.WaitDone() } type FlowControl struct { jobQueue *JobQueue wm *WorkerManager } func NewFlowControl() *FlowControl { jobQueue := NewJobQueue(10) fmt.Println("init job queue success") m := NewWorkerManager(jobQueue) m.createWorker() fmt.Println("init worker success") control := &FlowControl{ jobQueue: jobQueue, wm: m, } fmt.Println("init flowcontrol success") return control } func (c *FlowControl) CommitJob(job *Job) { c.jobQueue.PushJob(job) fmt.Println("commit job success") }
之前有一篇文章是優先順序佇列,實際上就是該佇列的高階實現版本,可以將不同的請求按優先順序分配到不同的佇列中。有興趣的同學可參考:Go實戰 單佇列到優先順序佇列的實現
通過將請求的上下文資訊封裝到一個工作單元中,並將其放入到佇列中,然後通過訊息通道的方式阻塞等待消費者執行完畢。同時在佇列中通過設定佇列的容量以解決請求過多而給系統造成壓力的問題。
以上就是Go http請求排隊處理實戰的詳細內容,更多關於Go http請求排隊的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45