首頁 > 軟體

Go語言非同步API設計的扇入扇出模式詳解

2022-08-05 22:01:32

前言

扇出/扇入模式是更高階 API 整合的主要內容。這些應用程式並不總是表現出相同的可用性或效能特徵。

扇出是從電子工程中借用的一個術語,它描述了輸入的邏輯閘連線到另一個輸出門的數量。輸出需要提供足夠的電流來驅動所有連線的輸入。在事務處理系統中,用來描述為了服務一個輸入請求而需要做的請求總數。

扇入是指為邏輯單元的輸入方程提供輸入訊號的最大數量。扇入是定義單個邏輯閘可以接受的最大數位輸入數量的術語。大多數電晶體-電晶體邏輯 (TTL) 門有一個或兩個輸入,儘管有些有兩個以上。典型的邏輯閘具有 1 或 2 的扇入。

扇入/扇出服務

我們舉一個現實世界的例子,一個電子商務網站將自己與一個第三方支付閘道器整合在一起。 這裡,網站使用支付閘道器的 API 來彈出支付螢幕並輸入安全證書。同時,網站可能會呼叫另一個稱為分析的 API 來記錄支付的嘗試。這種將一個請求分叉成多個請求的過程被稱為 fan-out 扇出。在現實世界中,一個客戶請求可能涉及許多扇出服務。

另一個例子是 MapReduce。Map 是一個扇入的操作,而 Reduce 是一個扇出的 操作。一個伺服器可以將一個資訊扇出到下一組服務(API),並忽略結果。或者可以等到這些伺服器的所有響應都返回。如 如下圖所示,一個傳入的請求被伺服器複用為轉換成兩個傳出的請求:

扇入 fan-in 是一種操作,即兩個或更多傳入的請求會聚成一個請求。這種情況下,API如何聚合來自多個後端服務的結果,並將結果即時返回給客戶。

例如,想想一個酒店價格聚合器或航班票務聚合器,它從不同的資料提供者那裡獲取關於多個酒店或航班的請求資訊並顯示出來。

下圖顯示了扇出操作是如何結合多個請求並準備一個最終的響應,由使用者端消費的。

使用者端也可以是一個伺服器,為更多的客戶提供服務。如上圖所示,左側的伺服器正在收集來自酒店 A、酒店 B 和 航空公司供應商 A,併為不同的客戶準備另一個響應。

因此,扇入和扇出操作並不總是完全相互獨立的。大多數情況下,它將是一個混合場景,扇入和扇出操作都是相互配合的。

請記住,對下一組伺服器的扇出操作可以是非同步的。也是如此。對於扇入請求來說,這可能不是真的。扇入操作有時被稱為 API 呼叫。

Go 語言實現扇入/扇出模式

Fan-out:多個 goroutine 從同一個通道讀取資料,直到該通道關閉。OUT 是一種張開的模式,所以又被稱為扇出,可以用來分發任務。

Fan-in:1 個 goroutine 從多個通道讀取資料,直到這些通道關閉。IN 是一種收斂的模式,所以又被稱為扇入,用來收集處理的結果。

package main
import (
	"context"
	"log"
	"sync"
	"time"
)
// Task 包含任務編號及任務所需時長
type Task struct {
	Number int
	Cost   time.Duration
}
// task channel 生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
	taskCh := make(chan Task)
	go func() {
		defer close(taskCh)
		for _, task := range taskList {
			select {
			case <-ctx.Done():
				return
			case taskCh <- task:
			}
		}
	}()
	return taskCh
}
// doTask 處理並返回已處理的任務編號作為通道的函數
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
	doneTaskCh := make(chan int)
	go func() {
		defer close(doneTaskCh)
		for task := range taskCh {
			select {
			case <-ctx.Done():
				return
			default:
				log.Printf("do task number: %dn", task.Number)
				// task 任務處理
				// 根據任務耗時休眠
				time.Sleep(task.Cost)
				doneTaskCh <- task.Number // 已處理任務的編號放入通道
			}
		}
	}()
	return doneTaskCh
}
// `fan-in` 意味著將多個資料流複用或合併成一個流。
// merge 函數接收引數傳遞的多個通道 「taskChs」,並返回單個通道 「<-chan int」
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
	var wg sync.WaitGroup
	mergedTaskCh := make(chan int)
	mergeTask := func(taskCh <-chan int) {
		defer wg.Done()
		for t := range taskCh {
			select {
			case <-ctx.Done():
				return
			case mergedTaskCh <- t:
			}
		}
	}
	wg.Add(len(taskChs))
	for _, taskCh := range taskChs {
		go mergeTask(taskCh)
	}
	// 等待所有任務處理完畢
	go func() {
		wg.Wait()
		close(mergedTaskCh)
	}()
	return mergedTaskCh
}
func main() {
	start := time.Now()
	// 使用 context 來防止 goroutine 洩漏,即使在處理過程中被中斷
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// taskList 定義每個任務及其成本
	taskList := []Task{
		Task{1, 1 * time.Second},
		Task{2, 7 * time.Second},
		Task{3, 2 * time.Second},
		Task{4, 3 * time.Second},
		Task{5, 5 * time.Second},
		Task{6, 3 * time.Second},
	}
	// taskChannelGerenator 是一個函數,它接收一個 taskList 並將其轉換為 Task 型別的通道
	// 執行結果(int slice channel)儲存在 worker 中
	// 由於 doTask 的結果是一個通道,被分給了多個 worker,這就對應了 fan-out 處理
	taskCh := taskChannelGerenator(ctx, taskList)
	numWorkers := 4
	workers := make([]<-chan int, numWorkers)
	for i := 0; i < numWorkers; i++ {
		workers[i] = doTask(ctx, taskCh)  // doTask 處理並返回已處理的任務編號作為通道的函數
	}
	count := 0
	for d := range merge(ctx, workers) { // merge 從中讀取已處理的任務編號
		count++
		log.Printf("done task number: %dn", d)
	}
	log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

參考連結:

Fan-in/fan-out of services

Understanding the Fan-Out/Fan-In API Integration Pattern

以上就是Go語言非同步API設計的扇入扇出模式詳解的詳細內容,更多關於Go非同步API扇入扇出模式的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com