作者 | 馬超 責編 | 張紅月出品 | CSDN部落格Serverless的核心理念就是函數式計算,開發者無需再關注具體的模組,雲上部署的粒度變成了程式函數,自動伸縮、擴容等工作完全
2021-06-16 03:03:40
Serverless的核心理念就是函數式計算,開發者無需再關注具體的模組,雲上部署的粒度變成了程式函數,自動伸縮、擴容等工作完全由雲服務負責。
Serverless Computing,即」無伺服器計算」,其實這一概念在剛剛提出的時候並沒有獲得太多的關注,直到2014年AWS Lambda這一里程碑式的產品出現。Serverless算是正式走進了雲端計算的舞臺。2018年5月,Google在KubeCon+CloudNative 2018期間開源了gVisor容器沙箱運行時並分享了它的設計理念和原則。隨後2018年的Google Next大會上Google推出了自己的 Google Serverless平臺 —— gVisor。同年AWS又放了顆大炮仗-Firecracker,這是一款基於Rust語言編寫的安全沙箱基礎元件,用於函數計算服務Lambda和託管的容器服務。
值得注意的是Google也並沒有死守自己一手締造的Go語言平臺,而是選擇了Go與Rust的模式,據說Google在Rust方面也開始招兵買馬,也要用Rust重寫之前基於Go編寫的Serverless平臺。
筆者寫本文的初衷,其實就是要回答為什麼在這個高併發大行其道的時代,以效能著稱的C語言和以安全高效聞名的Java都不香了呢?
高併發模式初探
在這個高併發時代最重要的設計模式無疑是生產者、消費者模式,比如著名的訊息佇列kafka其實就是一個生產者消費者模式的典型實現。其實生產者消費者問題,也就是有限緩衝問題,可以用以下場景進行簡要描述,生產者生成一定量的產品放到庫房,並不斷重複此過程;與此同時,消費者也在緩衝區消耗這些資料,但由於庫房大小有限,所以生產者和消費者之間步調協調,生產者不會在庫房滿的情況放入埠,消費者也不會在庫房空時消耗資料。詳見下圖:
而如果在生產者與消費者之間完美協調並保持高效,這就是高併發要解決的本質問題。
C語言的高併發案例
筆者曾經介紹過 TDEngine 的相關程式碼,其中 Sheduler 模組的相關排程演算法就使用了生產、消費者模式進行訊息傳遞功能的實現,也就是有多個生產者(producer)生成並不斷向佇列中傳遞訊息,也有多個消費者(consumer)不斷從佇列中取訊息。
後面我們也會說明類型功能在Go、Java 等高階語言中類似的功能已經被封裝好了,但是在C語言中你就必須要用好互斥體( mutex)和訊號量(semaphore)並協調他們之間的關係。由於C語言的實現是最複雜的,先來看結構體設計和他的註釋:
typedef struct {
char label[16];//訊息內容
sem_t emptySem;//此訊號量代表隊列的可寫狀態
sem_t fullSem;//此訊號量代表隊列的可讀狀態
pthread_mutex_t queueMutex;//此互斥體為保證訊息不會被誤修改,保證執行緒程安全
int fullSlot;//隊尾位置
int emptySlot;//隊頭位置
int queueSize;#佇列長度
int numOfThreads;//同時操作的執行緒數量
pthread_t * qthread;//執行緒指針
SSchedMsg * queue;//佇列指針
} SSchedQueue;
再來看Shceduler初始化函數,這裡需要特別說明的是,兩個訊號量的創建,其中emptySem是佇列的可寫狀態,初始化時其值為queueSize,即初始時佇列可寫,可接受訊息長度為佇列長度,fullSem是佇列的可讀狀態,初始化時其值為0,即初始時佇列不可讀。具體程式碼及我的註釋如下:
void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
pthread_attr_t attr;
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
memset(pSched, 0, sizeof(SSchedQueue));
pSched->queueSize = queueSize;
pSched->numOfThreads = numOfThreads;
strcpy(pSched->label, label);
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
//emptySem是佇列的可寫狀態,初始化時其值為queueSize,即初始時佇列可寫,可接受訊息長度為佇列長度。
if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
//fullSem是佇列的可讀狀態,初始化時其值為0,即初始時佇列不可讀
if (sem_init(&pSched->fullSem, 0, 0) != 0) {
pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
goto _error;
}
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
pSched->fullSlot = 0;//實始化時佇列為空,故隊頭和隊尾的位置都是0
pSched->emptySlot = 0;//實始化時佇列為空,故隊頭和隊尾的位置都是0
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
goto _error;
}
}
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
return (void *)pSched;
_error:
taosCleanUpScheduler(pSched);
return NULL;
}
再來看讀訊息的taosProcessSchedQueue函數這其實是消費者一方的實現,這個函數的主要邏輯是:
1.使用無限迴圈,只要佇列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續向下處理;
2.在操作msg前,加入互斥體防止msg被誤用;
3.讀操作完畢後修改fullSlot的值,注意這為避免fullSlot溢位,需要對於queueSize取餘。同時退出互斥體;
4.對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個訊息後,emptySem的值為6,即可寫狀態,且能接受的訊息數量為6。
具體程式碼及註釋如下:
void *taosProcessSchedQueue(void *param) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
//注意這裡是個無限迴圈,只要佇列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續處理
while (1) {
if (sem_wait(&pSched->fullSem) != 0) {
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
continue;
}
}
//加入互斥體防止msg被誤用。
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
//讀取完畢修改fullSlot的值,注意這為避免fullSlot溢位,需要對於queueSize取餘。
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
//讀取完畢修改退出互斥體
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%sn", pSched->label, strerror(errno));
//讀取完畢對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個訊息後,emptySem的值為6,即可寫狀態,且能接受的訊息數量為6
if (sem_post(&pSched->emptySem) != 0)
pError("post %s emptySem failed, reason:%sn", pSched->label, strerror(errno));
if (msg.fp)
(*(msg.fp))(&msg);
else if (msg.tfp)
(*(msg.tfp))(msg.ahandle, msg.thandle);
}
}
最後寫訊息的taosScheduleTask函數也就是生產的實現,其基本邏輯是
1.寫佇列前先對emptySem進行減1操作,如emptySem原值為1,那麼減1後為0,也就是佇列已滿,必須在讀取訊息後,即emptySem進行post操作後,佇列才能進行可寫狀態。
2.加入互斥體防止msg被誤操作,寫入完成後退出互斥體
3.寫佇列完成後對fullSem進行加1操作,如fullSem原值為0,那麼加1後為1,也就是佇列可讀,咱們上面介紹的讀取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就繼續向下。
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
if (pSched == NULL) {
pError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
}
//在寫佇列前先對emptySem進行減1操作,如emptySem原值為1,那麼減1後為0,也就是佇列已滿,必須在讀取訊息後,即emptySem進行post操作後,佇列才能進行可寫狀態。
if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
//加入互斥體防止msg被誤操作
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
//在寫佇列前先對fullSem進行加1操作,如fullSem原值為0,那麼加1後為1,也就是佇列可讀,咱們上面介紹的讀取函數可以進行處理。
if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
return 0;
}
Java的高併發實現
從併發模型來看,Go和Rust都有channel這個概念,也都是通過Channel來實現線(協)程間的同步,由於channel帶有讀寫狀態且保證資料順序,而且channel的封裝程度和效率明顯可以做的更高,因此Go和Rust官方都會建議使用channel(通訊)來共享記憶體,而不是使用共享記憶體來通訊。
為了讓幫助大家找到區別,我們先以Java為例來,看一下沒有channel的高階語言Java,生產者消費者該如何實現,程式碼及註釋如下:
public class Storage {
// 倉庫最大儲存量
private final int MAX_SIZE = 10;
// 倉庫儲存的載體
private LinkedList<Object> list = new LinkedList<Object>();
// 鎖
private final Lock lock = new ReentrantLock();
// 倉庫滿的訊號量
private final Condition full = lock.newCondition();
// 倉庫空的訊號量
private final Condition empty = lock.newCondition();
public void produce()
{
// 獲得鎖
lock.lock();
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生產者" + Thread.currentThread().getName()
+ "】倉庫已滿");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生產者" + Thread.currentThread().getName()
+ "】生產一個產品,現庫存" + list.size());
empty.signalAll();
lock.unlock();
}
public void consume()
{
// 獲得鎖
lock.lock();
while (list.size() == 0) {
System.out.println("【消費者" + Thread.currentThread().getName()
+ "】倉庫為空");
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消費者" + Thread.currentThread().getName()
+ "】消費一個產品,現庫存" + list.size());
full.signalAll();
lock.unlock();
}
}
在Java、C#這種面向物件,但是沒有channel語言中,生產者、消費者模式至少要藉助一個lock和兩個訊號量共同完成。其中鎖的作用是保證同是時間,倉庫中只有一個使用者進行資料的修改,而還需要表示倉庫滿的訊號量,一旦達到倉庫滿的情況則將此訊號量置為阻塞狀態,從而阻止其它生產者再向倉庫運商品了,反之倉庫空的訊號量也是一樣,一旦倉庫空了,也要阻其它消費者再前來消費了。
Go的高併發實現
我們剛剛也介紹過了Go語言中官方推薦使用channel來實現協程間通訊,所以不需要再新增lock和訊號量就能實現模式了,以下程式碼中我們通過子goroutine完成了生產者的功能,在在另一個子goroutine中實現了消費者的功能,注意要阻塞主goroutine以確保子goroutine能夠執行,從而輕而易舉的就這完成了生產者消費者模式。下面我們就通過具體實踐中來看一下生產者消費者模型的實現。
package main
import (
"fmt"
"time"
)
func Product(ch chan<- int) { //生產者
for i := 0; i < 3; i++ {
fmt.Println("Product produceed", i)
ch <- i //由於channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.
}
}
func Consumer(ch <-chan int) {
for i := 0; i < 3; i++ {
j := <-ch //由於channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.
fmt.Println("Consmuer consumed ", j)
}
}
func main() {
ch := make(chan int)
go Product(ch)//注意生產者與消費者放在不同goroutine中
go Consumer(ch)//注意生產者與消費者放在不同goroutine中
time.Sleep(time.Second * 1)//防止主goroutine退出
/*運行結果並不確定,可能為
Product produceed 0
Product produceed 1
Consmuer consumed 0
Consmuer consumed 1
Product produceed 2
Consmuer consumed 2
*/
}
可以看到和Java比起來使用GO來實現併發式的生產者消費者模式的確是更為清爽了。
Rust的高併發實現
不得不說Rust的難度實在太高了,雖然筆者之前在彙編、C、Java等方面的經驗可以幫助我快速掌握Go語言。但是假期看了兩天Rust真想大呼告辭,太勸退了。在Rust官方提供的功能中,其實並不包括多生產者、多消費者的channel,std:sync空間下只有一個多生產者單消費者(mpsc)的channel。其樣例實現如下:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
let tx2 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1"),
String::from("3"),
String::from("5"),
String::from("7"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("11"),
String::from("13"),
String::from("15"),
String::from("17"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("21"),
String::from("23"),
String::from("25"),
String::from("27"),
];
for val in vals {
tx2.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for rec in rx {
println!("Got: {}", rec);
}
}
可以看到在Rust下實現生產者消費者是不難的,但是生產者可以clone多個,不過消費者卻只能有一個,究其原因是因為Rust下沒有GC也就是垃圾回收功能,而想保證安全Rust就必須要對於變更使用許可權進行嚴格管理。在Rust下使用move關鍵字進行變更的所有權轉移,但是按照Rust對於變更生產週期的管理規定,執行緒間許可權轉移的所有權接收者在同一時間只能有一個,這也是Rust官方只提供MPSC的原因。
use std::thread;
fn main() {
let s = "hello";
let handle = thread::spawn(move || {
println!("{}", s);
});
handle.join().unwrap();
}
當然Rust下有一個API比較貼心就是join,他可以所有子執行緒都執行結束再退出主執行緒,這比Go中要手工阻塞還是要有一定的提高。而如果你想用多生產者、多消費者的功能,就要入手crossbeam模組了,這個模組掌握起來難度也真的不低。
總結
通過上面的比較我們可以用一張表格來說明幾種主流語言的情況對比:
可以看到Rust以其高安全性、基本比肩C的運行及啟動速度必將在Serverless的時代獨佔鰲頭,Go基本也能緊隨其後,而C語言程式中難以避免的野指針,Java相對較低的運行及啟動速度,可能都不太適用於函數式運算的場景,Java在企業級開發的時代打敗各種C#之類的對手,但是在雲時代好像還真沒有之前統治力那麼強了,真可謂是打敗你的往往不是你的對手,而是其它空間的降維打擊。
華為:海思堅持研發尖端半導體,不會進行任何重組或裁員;榮耀50即將釋出,預約人數破百萬;Grafana 8.0釋出|極客頭條
相關文章
作者 | 馬超 責編 | 張紅月出品 | CSDN部落格Serverless的核心理念就是函數式計算,開發者無需再關注具體的模組,雲上部署的粒度變成了程式函數,自動伸縮、擴容等工作完全
2021-06-16 03:03:40
今日,交通運輸部、國家發展改革委、財政部公開了《全面推廣高速公路差異化收費實施方案》,從方案來看,我國高速公路的收費將發生重大變化。方案公佈了差異化收費的六大重點任務
2021-06-16 03:02:41
【手機中國新聞】今年2月,華為釋出了新一代摺疊旗艦——華為Mate X2。當時很多人都為該機「雙旋水滴鉸鏈」的設計所驚豔,但除此以外,那朵隨螢幕展開而緩緩綻放的藍色花朵相信也
2021-06-16 03:02:10
在20世紀70年代引進一種外來的捕食性蝸牛後,南太平洋社會群島的50多種樹蝸牛被消滅了,但白殼的P. hyalina蝸牛卻倖存下來。現在,密歇根大學生物學家和擁有世界上最小計算機的工
2021-06-16 03:01:25
【6月16日訊】自從華為鴻蒙OS系統2.0正式版釋出以後,華為鴻蒙手機系統就成為了外界高度關注的目標,對於華為鴻蒙OS系統的實際使用表現、應用生態建設等等,都成為了廣大網友們所
2021-06-16 03:01:09
整合顯示卡(核芯顯示卡),曾經是羸弱的代名詞,更多的用途不過是良機、畫面輸出,玩遊戲都是奢望,但是近年來,無論Intel還是AMD,核顯效能、功能都突飛猛進,已經讓入門級獨立顯示卡徹底
2021-06-16 03:00:15