<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Fastflow 是什麼?用一句話來定義它:一個 基於golang協程
、支援水平擴容
的分散式高效能工作流框架
。
它具有以下特點:
組內有很多專案都涉及複雜的任務流場景,比如離線任務,叢集上下架,容器遷移等,這些場景都有幾個共同的特點:
流程耗時且步驟複雜,比如建立一個 k8s 叢集,需要幾十步操作,其中包含指令碼執行、介面呼叫等,且相互存在依賴關係。
任務量巨大,比如容器平臺每天都會有幾十萬的離線任務需要排程執行、再比如我們管理數百個K8S叢集,幾乎每天會有叢集需要上下節點、遷移容器等。
我們嘗試過各種解法:
當然 Github 上也還有其他的任務流引擎,我們也都評估過,無法滿足需求。比如 kubeflow 是基於 Pod 執行任務的,比起 程序
更為重量,還有一些專案,要麼就是沒有經過海量資料的考驗,要麼就是沒有考慮可伸縮性,面對大量任務的執行無法水平擴容。
fastflow 的工作流模型基於 DAG(Directed acyclic graph),下圖是一個簡單的 DAG 示意圖:
在這個圖中,首先 A 節點所定義的任務會被執行,當 A 執行完畢後,B、C兩個節點所定義的任務將同時被觸發,而只有 B、C 兩個節點都執行成功後,最後的 D 節點才會被觸發,這就是 fastflow 的工作流模型。
fastflow 執行任務的過程會涉及到幾個概念:Dag, Task, Action, DagInstance
描述了一個完整流程,它的每個節點被稱為 Task
,它定義了各個 Task 的執行順序和依賴關係,你可以通過程式設計
or yaml
來定義它
一個程式設計式定義的DAG
dag := &entity.Dag{ BaseInfo: entity.BaseInfo{ ID: "test-dag", }, Name: "test", Tasks: []entity.Task{ {ID: "task1", ActionName: "PrintAction"}, {ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}}, {ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}}, }, }
對應的yaml如下:
id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: ["task2"] actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]
同時 Dag 可以定義這個工作流所需要的引數,以便於在各個 Task 去消費它:
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" filePath: desc: "the file path" defaultValue: "/tmp/" tasks: - id: "task1" actionName: "PrintAction" params: writeName: "{{fileName}}" writePath: "{{filePath}}"
它定義了這個節點的具體工作,比如是要發起一個 http 請求,或是執行一段指令碼等,這些不同動作都通過選擇不同的 Action
來實現,同時它也可以定義在何種條件下需要跳過 or 阻塞該節點。
下面這段yaml演示了 Task 如何根據某些條件來跳過執行該節點。
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" actionName: "PrintAction" preCheck: - act: skip #you can set "skip" or "block" conditions: - source: vars # source could be "vars" or "share-data" key: "fileName" op: "in" values: ["warn.txt", "error.txt"]
Task 的狀態有以下幾個:
Action 是工作流的核心,定義了該節點將執行什麼操作,fastflow攜帶了一些開箱即用的Action,但是一般你都需要根據具體的業務場景自行編寫,它有幾個關鍵屬性:
自行開發的 Action 在使用前都必須先註冊到 fastflow,如下所示:
type PrintParams struct { Key string Value string } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println("action start: ", time.Now()) fmt.Println(fmt.Sprintf("params: key[%s] value[%s]", cinput.Key, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} } func main() { ... // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, }) ... }
當你開始執行一個 Dag 後,則會為本次執行生成一個執行記錄,它被稱為 DagInstance
,當它生成以後,會由 Leader 範例將其分發到一個健康的 Worker,再由其解析、執行。
首先 fastflow 是一個分散式的框架,意味著你可以部署多個範例來分擔負載,而範例被分為兩類角色:
協程
執行其中的任務而不同節點能夠承擔不同的功能,其背後是不同的 模組
在各司其職,不同節點所執行的模組如下圖所示:
NOTE
仲裁者
角色的 Worker,因此它也會分擔工作負載。從上面的圖看,Leader 範例會比 Worker 範例多執行一些模組用於執行中仲裁者相關的任務,模組之間的共同作業關係如下圖所示:
其中各個模組的職責如下:
Tips
以上模組的分佈機制僅僅只是 fastflow 的預設實現,你也可以自行決定範例執行的模組,比如在 Leader 上不再執行 Worker 的範例,讓其專注於任務排程。
更多例子請參考專案下面的
examples
目錄
如果已經你已經有了可測試的範例,可以直接替換為你的範例,如果沒有的話,可以使用Docker容器在本地跑一個,指令如下:
docker run -d --name fastflow-mongo --network host mongo
執行以下範例
package main import ( "fmt" "log" "time" "github.com/shiningrush/fastflow" mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo" "github.com/shiningrush/fastflow/pkg/entity/run" "github.com/shiningrush/fastflow/pkg/mod" mongoStore "github.com/shiningrush/fastflow/store/mongo" ) type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { fmt.Println("action start: ", time.Now()) return nil } func main() { // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, }) // init keeper, it used to e keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ Key: "worker-1", // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }
程式執行目錄下的test-dag.yaml
id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]
由於任務都是基於 goroutine
來執行,因此任務之間的 context
是共用的,意味著你完全可以使用以下的程式碼:
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }
但是注意這樣做有個弊端:當節點重啟時,如果任務尚未執行完畢,那麼這部分內容會丟失。
如果不想因為故障or升級而丟失你的更改,可以使用 ShareData 來傳遞進行通訊,ShareData 是整個 在整個 DagInstance 的生命週期都會共用的一塊資料空間,每次對它的寫入都會通過 Store
元件持久化,以確保資料不會丟失,用法如下:
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil }
fastflow 還提供了 Task 粒度的紀錄檔記錄,這些紀錄檔都會通過 Store
元件持久化,用法如下:
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil }
上面的文章中提到,我們可以在 Dag 中定義一些變數,在建立工作流時可以對這些變數進行賦值,比如以下的Dag,定義了一個名為 `fileName 的變數
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"
隨後我們可以使用 Commander
元件來啟動一個具體的工作流:
mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })
這樣本次啟動的工作流的變數則被賦值為 demo.txt
,接下來我們有兩種方式去消費它
1.帶引數的Action
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"
PrintAction.go:
type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }
2.程式設計式讀取
fastflow 也提供了相關函數來獲取 Dag 變數
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil }
如前所述,你可以在直接使用 Keeper
模組提供的分散式鎖,如下所示:
... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...
其中:
LockTTL
表示你持有該鎖的TTL,到期之後會自動釋放,預設 30s
Reentrant
用於需要實現可重入的分散式鎖的場景,作為持有場景的標識,預設為空,表示該鎖不可重入 歡迎轉載,註明出處即可。如果你覺得這篇博文幫助到你了,請點下右下角的推薦讓更多人看到它。到此這篇關於基於golang的輕量級工作流框架Fastflow的文章就介紹到這了,更多相關go Fastflow內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45