<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前面學習了很多多執行緒和任務的基礎知識,這裡要來實踐一下啦。通過本篇教學,你可以寫出一個簡單的工作流引擎。
本篇教學內容完成是基於任務的,只需要看過筆者的三篇關於非同步的文章,掌握 C# 基礎,即可輕鬆完成。
由於本篇文章編寫的工作流程式,主要使用任務,有些邏輯過程會比較難理解,多測試一下就好。程式碼主要還是 C# 基礎,為什麼說簡單?
因為是基於任務(Task)的,所以可以輕鬆設計組合流程,組成複雜的工作流。
由於只是講述基礎,所以不會包含很多種流程控制,這裡只實現一些簡單的。
先說明,別用到業務上。。。這個工作流非常簡單,就幾個功能,這個工作流是基於筆者的多執行緒系列文章的知識點。寫這個東西是為了講解任務操作,讓讀者更加深入理解任務。
程式碼地址:https://github.com/whuanle/CZGL.FLow
在開始前,我們來設計幾種流程控制的東西。
將一個 步驟/流程/節點 稱為 step。
一個普通的節點,包含一個任務。
多個 Then 節點,可以組成一條連續的工作流。
並行節點,可以設定多個並行節點放到 Parallel 中,以及在裡面為任一個節點建立新的分支。
定時節點,建立後會在一定時間後執行節點中的任務。
讓當前任務阻塞一段時間。
開啟你的 VS ,建立專案,Nuget 參照 CZGL.DoFlow
,版本 1.0.2 。
建立一個類 MyFlow1
,繼承 IDoFlow
。
public class MyFlow1 : IDoFlow { public int Id => 1; public string Name => "隨便起個名字"; public int Version => 1; public IDoFlowBuilder Build(IDoFlowBuilder builder) { throw new NotImplementedException(); } }
你可以建立多個工作流任務,每個工作流的 Id 必須唯一。Name 和 Version 隨便填,因為這裡筆者沒有對這幾個欄位做邏輯。
IDoFlowBuilder
是構建工作流的一個介面。
我們來寫一個工作流測試一下。
/// <summary> /// 普通節點 Then 使用方法 /// </summary> public class MyFlow1 : IDoFlow { public int Id => 1; public string Name => "test"; public int Version => 1; public IDoFlowBuilder Build(IDoFlowBuilder builder) { builder.StartWith(() => { Console.WriteLine("工作流開始"); }).Then(() => { Console.WriteLine("下一個節點"); }).Then(() => { Console.WriteLine("最後一個節點"); }); return builder; } }
Main 方法中:
static void Main(string[] args) { FlowCore.RegisterWorkflow<MyFlow1>(); // FlowCore.RegisterWorkflow(new MyFlow1()); FlowCore.Start(1); Console.ReadKey(); }
.StartWith()
方法開始一個工作流;
FlowCore.RegisterWorkflow<T>()
註冊一個工作流;
FlowCore.Start();
執行一個工作流;
其程式碼如下:
/// <summary> /// 並行節點 Parallel 使用方法 /// </summary> public class MyFlow2 : IDoFlow { public int Id => 2; public string Name => "test"; public int Version => 1; public IDoFlowBuilder Build(IDoFlowBuilder builder) { builder.StartWith() .Parallel(steps => { // 每個並行任務也可以設計後面繼續執行其它任務 steps.Do(() => { Console.WriteLine("並行1"); }).Do(() => { Console.WriteLine("並行2"); }); steps.Do(() => { Console.WriteLine("並行3"); }); // 並行任務設計完成後,必須呼叫此方法 // 此方法必須放在所有並行任務 .Do() 的最後 steps.EndParallel(); // 如果 .Do() 在 EndParallel() 後,那麼不會等待此任務 steps.Do(() => { Console.WriteLine("並行非同步"); }); // 開啟新的分支 steps.StartWith() .Then(() => { Console.WriteLine("新的分支" + Task.CurrentId); }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); }); }, false) .Then(() => { Console.WriteLine("11111111111111111 "); }); return builder; } }
Main 方法中:
static void Main(string[] args) { FlowCore.RegisterWorkflow<MyFlow2>(); FlowCore.Start(2); Console.ReadKey(); }
通過以上範例,可以大概瞭解本篇文章中我們要寫的程式。
建立一個類庫專案,名為 DoFlow
。
建立 Extensions
、Interfaces
、Services
三個目錄。
新建 IStepBuilder
介面檔案到 Interfaces
目錄,其內容如下:
using System; namespace DoFlow.Interfaces { public interface IStepBuilder { /// <summary> /// 普通節點 /// </summary> /// <param name="stepBuilder"></param> /// <returns></returns> IStepBuilder Then(Action action); /// <summary> /// 多個節點 /// <para>預設下,需要等待所有的任務完成,這個step才算完成</para> /// </summary> /// <param name="action"></param> /// <param name="anyWait">任意一個任務完成即可跳轉到下一個step</param> /// <returns></returns> IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false); /// <summary> /// 節點將在某個時間間隔後執行 /// <para>非同步,不會阻塞當前工作流的執行,計劃任務將在一段時間後觸發</para> /// </summary> /// <returns></returns> IStepBuilder Schedule(Action action, TimeSpan time); /// <summary> /// 阻塞一段時間 /// </summary> /// <param name="time"></param> /// <returns></returns> IStepBuilder Delay(TimeSpan time); } }
新建 IStepParallel
檔案到 Interfaces
目錄。
using System; namespace DoFlow.Interfaces { /// <summary> /// 並行任務 /// <para>預設情況下,只有這個節點的所有並行任務都完成後,這個節點才算完成</para> /// </summary> public interface IStepParallel { /// <summary> /// 一個並行任務 /// </summary> /// <param name="action"></param> /// <returns></returns> IStepParallel Do(Action action); /// <summary> /// 開始一個分支 /// </summary> /// <param name="action"></param> /// <returns></returns> IStepBuilder StartWith(Action action = null); /// <summary> /// 必須使用此方法結束一個並行任務 /// </summary> void EndParallel(); } /// <summary> /// 並行任務 /// <para>任意一個任務完成後,就可以跳轉到下一個 step</para> /// </summary> public interface IStepParallelAny : IStepParallel { } }
新建 IDoFlowBuilder
介面檔案到 Interfaces
目錄。
using System; using System.Threading.Tasks; namespace DoFlow.Interfaces { /// <summary> /// 構建工作流任務 /// </summary> public interface IDoFlowBuilder { /// <summary> /// 開始一個 step /// </summary> IStepBuilder StartWith(Action action = null); void EndWith(Action action); Task ThatTask { get; } } }
新建 IDoFlow
介面檔案到 Interfaces
目錄。
namespace DoFlow.Interfaces { /// <summary> /// 工作流 /// <para>無引數傳遞</para> /// </summary> public interface IDoFlow { /// <summary> /// 全域性唯一標識 /// </summary> int Id { get; } /// <summary> /// 標識此工作流的名稱 /// </summary> string Name { get; } /// <summary> /// 標識此工作流的版本 /// </summary> int Version { get; } IDoFlowBuilder Build(IDoFlowBuilder builder); } }
新建 DependencyInjectionService
檔案到 Services
目錄。
用於實現依賴注入和解耦。
using DoFlow.Extensions; using Microsoft.Extensions.DependencyInjection; using System; namespace DoFlow.Services { /// <summary> /// 依賴注入服務 /// </summary> public static class DependencyInjectionService { private static IServiceCollection _servicesList; private static IServiceProvider _services; static DependencyInjectionService() { IServiceCollection services = new ServiceCollection(); _servicesList = services; // 注入引擎需要的服務 InitExtension.StartInitExtension(); var serviceProvider = services.BuildServiceProvider(); _services = serviceProvider; } /// <summary> /// 新增一個注入到容器服務 /// </summary> /// <typeparam name="TService"></typeparam> /// <typeparam name="TImplementation"></typeparam> public static void AddService<TService, TImplementation>() where TService : class where TImplementation : class, TService { _servicesList.AddTransient<TService, TImplementation>(); } /// <summary> /// 獲取需要的服務 /// </summary> /// <typeparam name="TIResult"></typeparam> /// <returns></returns> public static TIResult GetService<TIResult>() { TIResult Tservice = _services.GetService<TIResult>(); return Tservice; } } }
新增一個 InitExtension
檔案到 Extensions
目錄。
using DoFlow.Interfaces; using DoFlow.Services; namespace DoFlow.Extensions { public static class InitExtension { private static bool IsInit = false; public static void StartInitExtension() { if (IsInit) return; IsInit = true; DependencyInjectionService.AddService<IStepBuilder, StepBuilder>(); DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>(); DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>(); DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>(); } } }
以下檔案均在 Services
目錄建立。
新建 StepBuilder
檔案,用於解析節點,構建任務。
using DoFlow.Interfaces; using System; using System.Threading.Tasks; namespace DoFlow.Services { /// <summary> /// 節點工作引擎 /// </summary> public class StepBuilder : IStepBuilder { private Task _task; /// <summary> /// 延遲執行 /// </summary> /// <param name="time"></param> /// <returns></returns> public IStepBuilder Delay(TimeSpan time) { Task.Delay(time).Wait(); return this; } /// <summary> /// 並行 step /// </summary> /// <param name="action"></param> /// <returns></returns> public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false) { IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>(); Task task = new Task(() => { action.Invoke(parallel); }); _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); }); _task = task; return this; } /// <summary> /// 計劃任務 /// </summary> /// <param name="action"></param> /// <param name="time"></param> /// <returns></returns> public IStepBuilder Schedule(Action action, TimeSpan time) { Task.Factory.StartNew(() => { Task.Delay(time).Wait(); action.Invoke(); }); return this; } /// <summary> /// 普通 step /// </summary> /// <param name="action"></param> /// <returns></returns> public IStepBuilder Then(Action action) { Task task = new Task(action); _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); task.Wait(); }); _task = task; return this; } public void SetTask(Task task) { _task = task; } } }
新建 StepParallel
檔案,裡面有兩個類,用於實現同步任務。
using DoFlow.Interfaces; using System; using System.Collections.Generic; using System.Threading.Tasks; namespace DoFlow.Services { /// <summary> /// 第一層所有任務結束後才能跳轉下一個 step /// </summary> public class StepParallelWhenAll : IStepParallel { private Task _task; private readonly List<Task> _tasks = new List<Task>(); public StepParallelWhenAll() { _task = new Task(() => { },TaskCreationOptions.AttachedToParent); } public IStepParallel Do(Action action) { _tasks.Add(Task.Run(action)); return this; } public void EndParallel() { _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { Task.WhenAll(_tasks).Wait(); }); } public IStepBuilder StartWith(Action action = null) { Task task = action is null ? new Task(() => { }) : new Task(action); var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>(); _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); }); return _stepBuilder; } } /// <summary> /// 完成任意一個任務即可跳轉到下一個 step /// </summary> public class StepParallelWhenAny : IStepParallelAny { private Task _task; private readonly List<Task> _tasks = new List<Task>(); public StepParallelWhenAny() { _task = Task.Run(() => { }); } public IStepParallel Do(Action action) { _tasks.Add(Task.Run(action)); return this; } public void EndParallel() { _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { Task.WhenAny(_tasks).Wait(); }); } public IStepBuilder StartWith(Action action = null) { Task task = action is null ? new Task(() => { }) : new Task(action); var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>(); _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); }); return _stepBuilder; } } }
新建 DoFlowBuilder
檔案,用於構建工作流。
using DoFlow.Interfaces; using System; using System.Threading.Tasks; namespace DoFlow.Services { public class DoFlowBuilder : IDoFlowBuilder { private Task _task; public Task ThatTask => _task; public void EndWith(Action action) { _task.Start(); } public IStepBuilder StartWith(Action action = null) { if (action is null) _task = new Task(() => { }); else _task = new Task(action); IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>(); ((StepBuilder)_stepBuilder).SetTask(_task); return _stepBuilder; } } }
新建 FlowEngine
檔案,用於執行工作流。
using DoFlow.Interfaces; namespace DoFlow.Services { /// <summary> /// 工作流引擎 /// </summary> public class FlowEngine { private readonly IDoFlow _flow; public FlowEngine(IDoFlow flow) { _flow = flow; } /// <summary> /// 開始一個工作流 /// </summary> public void Start() { IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>(); _flow.Build(builder).ThatTask.Start(); } } }
新建 FlowCore
檔案,用於儲存和索引工作流。使用讀寫鎖解決並行字典問題。
using DoFlow.Interfaces; using System; using System.Collections.Generic; using System.Threading; namespace DoFlow.Services { public static class FlowCore { private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>(); // 讀寫鎖 private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim(); /// <summary> /// 註冊工作流 /// </summary> /// <param name="flow"></param> public static bool RegisterWorkflow(IDoFlow flow) { try { readerWriterLockSlim.EnterReadLock(); if (flowEngines.ContainsKey(flow.Id)) return false; flowEngines.Add(flow.Id, new FlowEngine(flow)); return true; } finally { readerWriterLockSlim.ExitReadLock(); } } /// <summary> /// 註冊工作流 /// </summary> /// <param name="flow"></param> public static bool RegisterWorkflow<TDoFlow>() { Type type = typeof(TDoFlow); IDoFlow flow = (IDoFlow)Activator.CreateInstance(type); try { readerWriterLockSlim.EnterReadLock(); if (flowEngines.ContainsKey(flow.Id)) return false; flowEngines.Add(flow.Id, new FlowEngine(flow)); return true; } finally { readerWriterLockSlim.ExitReadLock(); } } /// <summary> /// 要啟動的工作流 /// </summary> /// <param name="id"></param> public static bool Start(int id) { FlowEngine engine; // 讀寫鎖 try { readerWriterLockSlim.EnterUpgradeableReadLock(); if (!flowEngines.ContainsKey(id)) return default; try { readerWriterLockSlim.EnterWriteLock(); engine = flowEngines[id]; } catch { return default; } finally { readerWriterLockSlim.ExitWriteLock(); } } catch { return default; } finally { readerWriterLockSlim.ExitUpgradeableReadLock(); } engine.Start(); return true; } } }
就這樣程式寫完了。
到此這篇關於C#多執行緒系列之工作流實現的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支援it145.com。
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45