首頁 > 軟體

C#多執行緒系列之工作流實現

2022-02-14 13:00:41

前言

前面學習了很多多執行緒和任務的基礎知識,這裡要來實踐一下啦。通過本篇教學,你可以寫出一個簡單的工作流引擎。

本篇教學內容完成是基於任務的,只需要看過筆者的三篇關於非同步的文章,掌握 C# 基礎,即可輕鬆完成。

由於本篇文章編寫的工作流程式,主要使用任務,有些邏輯過程會比較難理解,多測試一下就好。程式碼主要還是 C# 基礎,為什麼說簡單?

  • 不包含 async 、await
  • 幾乎不含包含多執行緒(有個讀寫鎖)
  • 不包含表示式樹
  • 幾乎不含反射(有個小地方需要反射一下,但是非常簡單)
  • 沒有複雜的演演算法

因為是基於任務(Task)的,所以可以輕鬆設計組合流程,組成複雜的工作流。

由於只是講述基礎,所以不會包含很多種流程控制,這裡只實現一些簡單的。

先說明,別用到業務上。。。這個工作流非常簡單,就幾個功能,這個工作流是基於筆者的多執行緒系列文章的知識點。寫這個東西是為了講解任務操作,讓讀者更加深入理解任務。

程式碼地址:https://github.com/whuanle/CZGL.FLow

節點

在開始前,我們來設計幾種流程控制的東西。

將一個 步驟/流程/節點 稱為 step。

Then

一個普通的節點,包含一個任務。

多個 Then 節點,可以組成一條連續的工作流。

Parallel

並行節點,可以設定多個並行節點放到 Parallel 中,以及在裡面為任一個節點建立新的分支。

Schedule

定時節點,建立後會在一定時間後執行節點中的任務。

Delay

讓當前任務阻塞一段時間。

試用一下

順序節點

開啟你的 VS ,建立專案,Nuget 參照 CZGL.DoFlow ,版本 1.0.2 。

建立一個類 MyFlow1,繼承 IDoFlow

    public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "隨便起個名字";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

你可以建立多個工作流任務,每個工作流的 Id 必須唯一。Name 和 Version 隨便填,因為這裡筆者沒有對這幾個欄位做邏輯。

IDoFlowBuilder 是構建工作流的一個介面。

我們來寫一個工作流測試一下。

/// <summary>
/// 普通節點 Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("工作流開始");
        }).Then(() =>
        {
            Console.WriteLine("下一個節點");
        }).Then(() =>
         {
             Console.WriteLine("最後一個節點");
         });
        return builder;
    }
} 

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() 方法開始一個工作流;

FlowCore.RegisterWorkflow<T>() 註冊一個工作流;

FlowCore.Start();執行一個工作流;

並行任務

其程式碼如下:

    /// <summary>
    /// 並行節點 Parallel 使用方法
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每個並行任務也可以設計後面繼續執行其它任務
                    steps.Do(() =>
                    {
                        Console.WriteLine("並行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("並行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("並行3");
                    });

                    // 並行任務設計完成後,必須呼叫此方法
                    // 此方法必須放在所有並行任務 .Do() 的最後
                    steps.EndParallel();

                    // 如果 .Do() 在 EndParallel() 後,那麼不會等待此任務
                    steps.Do(() => { Console.WriteLine("並行非同步"); });

                    // 開啟新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });

            return builder;
        }
    }

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

通過以上範例,可以大概瞭解本篇文章中我們要寫的程式。

編寫工作流

建立一個類庫專案,名為 DoFlow

建立 ExtensionsInterfacesServices 三個目錄。

介面構建器

新建 IStepBuilder 介面檔案到 Interfaces 目錄,其內容如下:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 普通節點
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// 多個節點
        /// <para>預設下,需要等待所有的任務完成,這個step才算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">任意一個任務完成即可跳轉到下一個step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// 節點將在某個時間間隔後執行
        /// <para>非同步,不會阻塞當前工作流的執行,計劃任務將在一段時間後觸發</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// 阻塞一段時間
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

新建 IStepParallel 檔案到 Interfaces 目錄。

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 並行任務
    ///  <para>預設情況下,只有這個節點的所有並行任務都完成後,這個節點才算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一個並行任務
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// 開始一個分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// 必須使用此方法結束一個並行任務
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// 並行任務
    /// <para>任意一個任務完成後,就可以跳轉到下一個 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

工作流構建器

新建 IDoFlowBuilder 介面檔案到 Interfaces 目錄。

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 構建工作流任務
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 開始一個 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

新建 IDoFlow 介面檔案到 Interfaces 目錄。

namespace DoFlow.Interfaces
{

    /// <summary>
    /// 工作流
    /// <para>無引數傳遞</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全域性唯一標識
        /// </summary>
        int Id { get; }

        /// <summary>
        /// 標識此工作流的名稱
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 標識此工作流的版本
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

依賴注入

新建 DependencyInjectionService 檔案到 Services 目錄。

用於實現依賴注入和解耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace DoFlow.Services
{
    /// <summary>
    /// 依賴注入服務
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎需要的服務
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }

        /// <summary>
        /// 新增一個注入到容器服務
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }

        /// <summary>
        /// 獲取需要的服務
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}

新增一個 InitExtension 檔案到 Extensions 目錄。

using DoFlow.Interfaces;
using DoFlow.Services;

namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}

實現工作流解析

以下檔案均在 Services 目錄建立。

新建 StepBuilder 檔案,用於解析節點,構建任務。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{

    /// <summary>
    /// 節點工作引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;

        /// <summary>
        /// 延遲執行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }

        /// <summary>
        /// 並行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });

            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }

        /// <summary>
        /// 計劃任務
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }

        /// <summary>
        /// 普通 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }

        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}

新建 StepParallel 檔案,裡面有兩個類,用於實現同步任務。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    /// <summary>
    /// 第一層所有任務結束後才能跳轉下一個 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }

    /// <summary>
    /// 完成任意一個任務即可跳轉到下一個 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }
}

新建 DoFlowBuilder 檔案,用於構建工作流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;

        public void EndWith(Action action)
        {
            _task.Start();
        }

        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);

            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}

新建 FlowEngine 檔案,用於執行工作流。

using DoFlow.Interfaces;

namespace DoFlow.Services
{
    /// <summary>
    /// 工作流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }

        /// <summary>
        /// 開始一個工作流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}

新建 FlowCore 檔案,用於儲存和索引工作流。使用讀寫鎖解決並行字典問題。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // 讀寫鎖
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// 註冊工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 註冊工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {

            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 要啟動的工作流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 讀寫鎖
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}

就這樣程式寫完了。

到此這篇關於C#多執行緒系列之工作流實現的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com