首頁 > 軟體

詳解MySqlBulkLoader的使用

2022-07-14 14:04:37

mysql資料庫:最近要寫一個服務,跨庫資料同步,目前資料量大約一萬,以後會越來越多,考慮到擴充套件性,資料的插入操作就採用了MySqlBulkLoader。本文分兩部分來寫,第一部分寫一下MySqlBulkLoader的使用,第二部分記錄使用過程中出現的問題。

一、MySqlBulkLoader的使用

我們先來定義個資料表student,表結構如下:

建立一個core控制檯專案,相關程式碼如下:

入口程式碼:

using System;
using System.Collections.Generic;
namespace MySqlBulkLoaderDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            //裝載30個資料
            List<Student> stuList = new List<Student>();
            for (int i = 0; i < 30; i++)
            {
                stuList.Add(
                new Student
                {
                    Guid = Guid.NewGuid().ToString(),
                    Name = "QXH",
                    Age = new Random().Next(1, 30)
                });
            }
            //呼叫MySqlBulkLoader,往student表中插入stuList
            int insertCount = MySqlBulkLoaderHelper.BulkInsert<Student>(stuList, "student");
            Console.WriteLine($"成功插入{insertCount}條資料");
            Console.ReadKey();
        }
    }
}

定義一個Student對映類:

using System;
using System.Collections.Generic;
using System.Text;

namespace MySqlBulkLoaderDemo
{
    public class Student
    {
        public string Guid { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
    }
}

定義一個MySqlBulkLoaderHelper類,用於存放相關方法:

using MySql.Data.MySqlClient;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;

namespace MySqlBulkLoaderDemo
{
    public class MySqlBulkLoaderHelper
    {
        const string ConnectionString = "server=localhost;port=3306;user=root;password=123456;database=mysql;SslMode = none;AllowLoadLocalInfile=true";
        public static int BulkInsert<T>(List<T> entities, string tableName)
        {
            DataTable dt = entities.ToDataTable();
            using (MySqlConnection conn = new MySqlConnection())
            {
                conn.ConnectionString = ConnectionString;
                if (conn.State != ConnectionState.Open)
                {
                    conn.Open();
                }

                if (tableName.IsNullOrEmpty())
                {
                    var tableAttribute = typeof(T).GetCustomAttributes(typeof(TableAttribute), true).FirstOrDefault();
                    if (tableAttribute != null)
                        tableName = ((TableAttribute)tableAttribute).Name;
                    else
                        tableName = typeof(T).Name;
                }

                int insertCount = 0;
                string tmpPath = Path.Combine(Path.GetTempPath(), DateTime.Now.Ticks.ToString() + "_" + Guid.NewGuid().ToString() + ".tmp");
                string csv = dt.ToCsvStr();
                File.WriteAllText(tmpPath, csv, Encoding.UTF8);

                using (MySqlTransaction tran = conn.BeginTransaction())
                {
                    MySqlBulkLoader bulk = new MySqlBulkLoader(conn)
                    {
                        FieldTerminator = ",",
                        FieldQuotationCharacter = '"',
                        EscapeCharacter = '"',
                        LineTerminator = "rn",
                        FileName = tmpPath,
                        Local = true,
                        NumberOfLinesToSkip = 0,
                        TableName = tableName,
                        CharacterSet = "utf8"
                    };
                    try
                    {
                        bulk.Columns.AddRange(dt.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList());
                        insertCount = bulk.Load();
                        tran.Commit();
                    }
                    catch (MySqlException ex)
                    {
                        if (tran != null)
                            tran.Rollback();

                        throw ex;
                    }
                }
                File.Delete(tmpPath);
                return insertCount;
            }

        }
    }
}

定義一個幫助類ExtentionHelper,主要是擴充套件方法:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;

namespace MySqlBulkLoaderDemo
{
    public static class ExtentionHelper
    {
        /// <summary>
        /// 將物件序列化成Json字串
        /// </summary>
        /// <param name="obj">需要序列化的物件</param>
        /// <returns></returns>
        public static string ToJson(this object obj)
        {
            return JsonConvert.SerializeObject(obj);
        }
        /// <summary>
        /// 將Json字串轉為DataTable
        /// </summary>
        /// <param name="jsonStr">Json字串</param>
        /// <returns></returns>
        public static DataTable ToDataTable(this string jsonStr)
        {
            return jsonStr == null ? null : JsonConvert.DeserializeObject<DataTable>(jsonStr);
        }
        /// <summary>
        /// 將IEnumerable'T'轉為對應的DataTable
        /// </summary>
        /// <typeparam name="T">資料模型</typeparam>
        /// <param name="iEnumberable">資料來源</param>
        /// <returns>DataTable</returns>
        public static DataTable ToDataTable<T>(this IEnumerable<T> iEnumberable)
        {
            return iEnumberable.ToJson().ToDataTable();
        }
        /// <summary>
        /// 判斷是否為Null或者空
        /// </summary>
        /// <param name="obj">物件</param>
        /// <returns></returns>
        public static bool IsNullOrEmpty(this object obj)
        {
            if (obj == null)
                return true;
            else
            {
                string objStr = obj.ToString();
                return string.IsNullOrEmpty(objStr);
            }
        }

        /// <summary>
        ///將DataTable轉換為標準的CSV字串
        /// </summary>
        /// <param name="dt">資料表</param>
        /// <returns>返回標準的CSV</returns>
        public static string ToCsvStr(this DataTable dt)
        {
            //以半形逗號(即,)作分隔符,列為空也要表達其存在。
            //列內容如存在半形逗號(即,)則用半形引號(即"")將該欄位值包含起來。
            //列內容如存在半形引號(即")則應替換成半形雙引號("")跳脫,並用半形引號(即"")將該欄位值包含起來。
            StringBuilder sb = new StringBuilder();
            DataColumn colum;
            foreach (DataRow row in dt.Rows)
            {
                for (int i = 0; i < dt.Columns.Count; i++)
                {
                    colum = dt.Columns[i];
                    if (i != 0) sb.Append(",");
                    if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
                    {
                        sb.Append(""" + row[colum].ToString().Replace(""", """") + """);
                    }
                    else sb.Append(row[colum].ToString());
                }
                sb.AppendLine();
            }

            return sb.ToString();
        }
    }
}

完整專案:MySqlBulkLoaderDemo

執行結果如下:

二、MySqlBulkLoader使用過程中出現的問題

上邊已經完整了介紹了MySqlBulkLoader的使用,但是在使用過程中出現了很多問題,主要集中在兩方面,第一個方面是Mysql資料庫不支援載入本地檔案資料;第二個方面是我的資料庫在阿里雲伺服器上,而程式碼在本地,換句話說資料庫和專案是分別放在不同伺服器上的。

1、Mysql資料庫不支援載入本地檔案資料

(1)MySQLBulkLoader原理?

我們結合SQLBulkCopy來說,用過SqlServer資料庫的都熟悉SQLBulkCopy,很方便,可以直接將datatable中的資料批次匯入到資料庫。與SQLBulkCopy不同,MySQLBulkLoader也稱為LOAD DATA INFILE,他要從檔案讀取資料,所以我們需要將我們的資料集(如上邊的List<Student>)儲存到檔案,然後再從檔案裡面讀取。而對於Mysql來說,為了資料庫的安全,本地匯入檔案的設定沒有開啟,所以使用MySQLBulkLoader批次匯入資料庫,就需要mysql資料庫支援本地匯入檔案。否則會出現以下錯誤:

The used command is not allowed with this MySQL version

(2)解決方案

mysql資料庫開啟允許本地匯入資料的設定,命令如下:

SET GLOBAL local_infile=1;//1表示開啟,0表示關閉

檢視該設定的狀態命令如下:

SHOW VARIABLES LIKE '%local%';

在專案裡面的資料庫連線字串做設定

資料庫連線字串要加上”AllowLoadLocalInfile=true“,如下:

const string ConnectionString = "server=localhost;port=3306;user=root;password=123456;database=mysql;SslMode = none;AllowLoadLocalInfile=true";

2、資料庫和專案是分別放在不同伺服器上

(1)問題描述

資料庫和專案是分別放在不同伺服器上,會造成以下問題:

System.NotSupportedException
HResult=0x80131515
Message=To use MySqlBulkLoader.Local=true, set AllowLoadLocalInfile=true in the connection string. See https://fl.vu/mysql-load-data

(2)原因

因為專案中將資料集生成的檔案儲存在了專案所在的伺服器,另一個伺服器上的資料庫在插入資料操作時,找不到資料集檔案,導致的錯誤

(3)解決方法

方法很簡單,因為資料庫並不在專案所在的伺服器,所以MySqlBulkLoader中要設定Local = true讀取本地檔案,進行匯入。具體程式碼如下:

(4)總結

如果你的專案和資料庫在一臺伺服器上,那麼就不會出現該問題。


IT145.com E-mail:sddin#qq.com