广州鸿名健康科技有限公司


关于SQL Server 批量插入数据的完美解决方法

网络编程 关于SQL Server 批量插入数据的完美解决方法 09-20

一、Sql Server插入方案介绍

关于 SqlServer 批量插入的方式,有三种比较常用的插入方式,InsertBatchInsertSqlBulkCopy,下面我们对比以下三种方案的速度

1.普通的Insert插入方法

public static void Insert(IEnumerable<Person> persons)
{
using (var con = new SqlConnection(“Server=.;Database=DemoDataBase;User ID=sa;Password=8888;”))
{
con.Open();
foreach (var person in persons)
{
using (var com = new SqlCommand(
“INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)”,
con))
{
com.Parameters.AddRange(new[]
{
new SqlParameter(“@Id”, SqlDbType.BigInt) {Value = person.Id},
new SqlParameter(“@Name”, SqlDbType.VarChar, 64) {Value = person.Name},
new SqlParameter(“@Age”, SqlDbType.Int) {Value = person.Age},
new SqlParameter(“@CreateTime”, SqlDbType.DateTime)
{Value = person.CreateTime ?? (object) DBNull.Value},
new SqlParameter(“@Sex”, SqlDbType.Int) {Value = (int)person.Sex},
});
com.ExecuteNonQuery();
}
}
}
}

2.拼接BatchInsert插入语句

public static void BatchInsert(Person[] persons)
{
using (var con = new SqlConnection(“Server=.;Database=DemoDataBase;User ID=sa;Password=8888;”))
{
con.Open();
var pageCount = (persons.Length – 1) / 1000 + 1;
for (int i = 0; i < pageCount; i++)
{
var personList = persons.Skip(i * 1000).Take(1000).ToArray();
var values = personList.Select(p =>
$”({p.Id},'{p.Name}’,{p.Age},{(p.CreateTime.HasValue ? $”‘{p.CreateTime:yyyy-MM-dd HH:mm:ss}'” : “NULL”)},{(int) p.Sex})”);
var insertSql =
$”INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(“,”, values)}”;
using (var com = new SqlCommand(insertSql, con))
{
com.ExecuteNonQuery();
}
}
}
}

3.SqlBulkCopy插入方案

public static void BulkCopy(IEnumerable<Person> persons){  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))  {    con.Open();    var table = new DataTable();    table.Columns.AddRange(new []    {      new DataColumn("Id", typeof(long)),       new DataColumn("Name", typeof(string)),       new DataColumn("Age", typeof(int)),       new DataColumn("CreateTime", typeof(DateTime)),       new DataColumn("Sex", typeof(int)),     });    foreach (var p in persons)    {      table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex});    }    using (var copy = new SqlBulkCopy(con))    {      copy.DestinationTableName = "Person";      copy.WriteToServer(table);    }  }}

3.三种方案速度对比

方案数量时间
Insert1千条145.4351ms
BatchInsert1千条103.9061ms
SqlBulkCopy1千条7.021ms
Insert1万条1501.326ms
BatchInsert1万条850.6274ms
SqlBulkCopy1万条30.5129ms
Insert10万条13875.4934ms
BatchInsert10万条8278.9056ms
SqlBulkCopy10万条314.8402ms

两者插入效率对比,Insert明显比SqlBulkCopy要慢太多,大概20~40倍性能差距,下面我们将SqlBulkCopy封装一下,让批量插入更加方便

二、SqlBulkCopy封装代码

1.方法介绍

批量插入扩展方法签名

方法方法参数介绍
BulkCopy同步的批量插入方法
SqlConnection connectionsql server 连接对象
IEnumerable<T> source需要批量插入的数据源
string tableName = null插入表名称【为NULL默认为实体名称】
int bulkCopyTimeout = 30批量插入超时时间
int batchSize = 0写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default批量复制参数
SqlTransaction externalTransaction = null执行的事务对象
BulkCopyAsync异步的批量插入方法
SqlConnection connectionsql server 连接对象
IEnumerable<T> source需要批量插入的数据源
string tableName = null插入表名称【为NULL默认为实体名称】
int bulkCopyTimeout = 30批量插入超时时间
int batchSize = 0写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default批量复制参数
SqlTransaction externalTransaction = null执行的事务对象

这个方法主要解决了两个问题:

  • 免去了手动构建DataTable或者IDataReader接口实现类,手动构建的转换比较难以维护,如果修改字段就得把这些地方都进行修改,特别是还需要将枚举类型特殊处理,转换成他的基础类型(默认int
  • 不用亲自创建SqlBulkCopy对象,和配置数据库列的映射,和一些属性的配置

此方案也是在我公司中使用,以满足公司的批量插入数据的需求,例如第三方的对账数据此方法使用的是Expression动态生成数据转换函数,其效率和手写的原生代码差不多,和原生手写代码相比,多余的转换损失很小【最大的性能损失都是在值类型拆装箱上】

此方案和其他网上的方案有些不同的是:不是将List先转换成DataTable,然后写入SqlBulkCopy的,而是使用一个实现IDataReader的读取器包装List,每往SqlBulkCopy插入一行数据才会转换一行数据

IDataReader方案和DataTable方案相比优点

效率高:DataTable方案需要先完全转换后,才能交由SqlBulkCopy写入数据库,而IDataReader方案可以边转换边交给SqlBulkCopy写入数据库(例如:10万数据插入速度可提升30%)

占用内存少:DataTable方案需要先完全转换后,才能交由SqlBulkCopy写入数据库,需要占用大量内存,而IDataReader方案可以边转换边交给SqlBulkCopy写入数据库,无须占用过多内存

强大:因为是边写入边转换,而且EnumerableReader传入的是一个迭代器,可以实现持续插入数据的效果

2.实现原理

① 实体Model与表映射

数据库表代码

CREATE TABLE [dbo].[Person](
[Id] [BIGINT] NOT NULL,
[Name] [VARCHAR](64) NOT NULL,
[Age] [INT] NOT NULL,
[CreateTime] [DATETIME] NULL,
[Sex] [INT] NOT NULL,
PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

实体类代码

public class Person{  public long Id { get; set; }  public string Name { get; set; }  public int Age { get; set; }  public DateTime? CreateTime { get; set; }  public Gender Sex { get; set; }}public enum Gender{  Man = 0,  Woman = 1}
  • 创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】
  • 创建映射使用的SqlBulkCopy类型的ColumnMappings属性来完成,数据列与数据库中列的映射
//创建批量插入对象using (var copy = new SqlBulkCopy(connection, options, externalTransaction)){  foreach (var column in ModelToDataTable<TModel>.Columns)  {    //创建字段映射    copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);  }}

② 实体转换成数据行

将数据转换成数据行采用的是:反射+Expression来完成

其中反射是用于获取编写Expression所需程序类,属性等信息

其中Expression是用于生成高效转换函数其中ModelToDataTable<TModel>类型利用了静态泛型类特性,实现泛型参数的缓存效果

ModelToDataTable<TModel>的静态构造函数中,生成转换函数,获取需要转换的属性信息,并存入静态只读字段中,完成缓存

③ 使用IDataReader插入数据的重载

EnumerableReader是实现了IDataReader接口的读取类,用于将模型对象,在迭代器中读取出来,并转换成数据行,可供SqlBulkCopy读取

SqlBulkCopy只会调用三个方法:GetOrdinalReadGetValue

  • 其中GetOrdinal只会在首行读取每个列所代表序号【需要填写:SqlBulkCopy类型的ColumnMappings属性】
  • 其中Read方法是迭代到下一行,并调用ModelToDataTable<TModel>.ToRowData.Invoke()来将模型对象转换成数据行object[]
  • 其中GetValue方法是获取当前行指定下标位置的值

3.完整代码

扩展方法类

 public static class SqlConnectionExtension  {    /// <summary>    /// 批量复制    /// </summary>    /// <typeparam name="TModel">插入的模型对象</typeparam>    /// <param name="source">需要批量插入的数据源</param>    /// <param name="connection">数据库连接对象</param>    /// <param name="tableName">插入表名称【为NULL默认为实体名称】</param>    /// <param name="bulkCopyTimeout">插入超时时间</param>    /// <param name="batchSize">写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】</param>    /// <param name="options">批量复制参数</param>    /// <param name="externalTransaction">执行的事务对象</param>    /// <returns>插入数量</returns>    public static int BulkCopy<TModel>(this SqlConnection connection,      IEnumerable<TModel> source,      string tableName = null,      int bulkCopyTimeout = 30,      int batchSize = 0,      SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,      SqlTransaction externalTransaction = null)    {      //创建读取器      using (var reader = new EnumerableReader<TModel>(source))      {        //创建批量插入对象        using (var copy = new SqlBulkCopy(connection, options, externalTransaction))        {          //插入的表          copy.DestinationTableName = tableName ?? typeof(TModel).Name;          //写入数据库一批数量          copy.BatchSize = batchSize;          //超时时间          copy.BulkCopyTimeout = bulkCopyTimeout;          //创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】          foreach (var column in ModelToDataTable<TModel>.Columns)          {            //创建字段映射            copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);          }          //将数据批量写入数据库          copy.WriteToServer(reader);          //返回插入数据数量          return reader.Depth;        }      }    }    /// <summary>    /// 批量复制-异步    /// </summary>    /// <typeparam name="TModel">插入的模型对象</typeparam>    /// <param name="source">需要批量插入的数据源</param>    /// <param name="connection">数据库连接对象</param>    /// <param name="tableName">插入表名称【为NULL默认为实体名称】</param>    /// <param name="bulkCopyTimeout">插入超时时间</param>    /// <param name="batchSize">写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】</param>    /// <param name="options">批量复制参数</param>    /// <param name="externalTransaction">执行的事务对象</param>    /// <returns>插入数量</returns>    public static async Task<int> BulkCopyAsync<TModel>(this SqlConnection connection,      IEnumerable<TModel> source,      string tableName = null,      int bulkCopyTimeout = 30,      int batchSize = 0,      SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,      SqlTransaction externalTransaction = null)    {      //创建读取器      using (var reader = new EnumerableReader<TModel>(source))      {        //创建批量插入对象        using (var copy = new SqlBulkCopy(connection, options, externalTransaction))        {          //插入的表          copy.DestinationTableName = tableName ?? typeof(TModel).Name;          //写入数据库一批数量          copy.BatchSize = batchSize;          //超时时间          copy.BulkCopyTimeout = bulkCopyTimeout;          //创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】          foreach (var column in ModelToDataTable<TModel>.Columns)          {            //创建字段映射            copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);          }          //将数据批量写入数据库          await copy.WriteToServerAsync(reader);          //返回插入数据数量          return reader.Depth;        }      }    }  }

封装的迭代器数据读取器

 /// <summary>  /// 迭代器数据读取器  /// </summary>  /// <typeparam name="TModel">模型类型</typeparam>  public class EnumerableReader<TModel> : IDataReader  {    /// <summary>    /// 实例化迭代器读取对象    /// </summary>    /// <param name="source">模型源</param>    public EnumerableReader(IEnumerable<TModel> source)    {      _source = source ?? throw new ArgumentNullException(nameof(source));      _enumerable = source.GetEnumerator();    }    private readonly IEnumerable<TModel> _source;    private readonly IEnumerator<TModel> _enumerable;    private object[] _currentDataRow = Array.Empty<object>();    private int _depth;    private bool _release;    public void Dispose()    {      _release = true;      _enumerable.Dispose();    }    public int GetValues(object[] values)    {      if (values == null) throw new ArgumentNullException(nameof(values));      var length = Math.Min(_currentDataRow.Length, values.Length);      Array.Copy(_currentDataRow, values, length);      return length;    }    public int GetOrdinal(string name)    {      for (int i = 0; i < ModelToDataTable<TModel>.Columns.Count; i++)      {        if (ModelToDataTable<TModel>.Columns[i].ColumnName == name) return i;      }      return -1;    }    public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length)    {      if (dataIndex < 0) throw new Exception($"起始下标不能小于0!");      if (bufferIndex < 0) throw new Exception("目标缓冲区起始下标不能小于0!");      if (length < 0) throw new Exception("读取长度不能小于0!");      var numArray = (byte[])GetValue(ordinal);      if (buffer == null) return numArray.Length;      if (buffer.Length <= bufferIndex) throw new Exception("目标缓冲区起始下标不能大于目标缓冲区范围!");      var freeLength = Math.Min(numArray.Length - bufferIndex, length);      if (freeLength <= 0) return 0;      Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);      return freeLength;    }    public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length)    {      if (dataIndex < 0) throw new Exception($"起始下标不能小于0!");      if (bufferIndex < 0) throw new Exception("目标缓冲区起始下标不能小于0!");      if (length < 0) throw new Exception("读取长度不能小于0!");      var numArray = (char[])GetValue(ordinal);      if (buffer == null) return numArray.Length;      if (buffer.Length <= bufferIndex) throw new Exception("目标缓冲区起始下标不能大于目标缓冲区范围!");      var freeLength = Math.Min(numArray.Length - bufferIndex, length);      if (freeLength <= 0) return 0;      Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);      return freeLength;    }    public bool IsDBNull(int i)    {      var value = GetValue(i);      return value == null || value is DBNull;    }    public bool NextResult()    {      //移动到下一个元素      if (!_enumerable.MoveNext()) return false;      //行层+1      Interlocked.Increment(ref _depth);      //得到数据行      _currentDataRow = ModelToDataTable<TModel>.ToRowData.Invoke(_enumerable.Current);      return true;    }    public byte GetByte(int i) => (byte)GetValue(i);    public string GetName(int i) => ModelToDataTable<TModel>.Columns[i].ColumnName;    public string GetDataTypeName(int i) => ModelToDataTable<TModel>.Columns[i].DataType.Name;    public Type GetFieldType(int i) => ModelToDataTable<TModel>.Columns[i].DataType;    public object GetValue(int i) => _currentDataRow[i];    public bool GetBoolean(int i) => (bool)GetValue(i);    public char GetChar(int i) => (char)GetValue(i);    public Guid GetGuid(int i) => (Guid)GetValue(i);    public short GetInt16(int i) => (short)GetValue(i);    public int GetInt32(int i) => (int)GetValue(i);    public long GetInt64(int i) => (long)GetValue(i);    public float GetFloat(int i) => (float)GetValue(i);    public double GetDouble(int i) => (double)GetValue(i);    public string GetString(int i) => (string)GetValue(i);    public decimal GetDecimal(int i) => (decimal)GetValue(i);    public DateTime GetDateTime(int i) => (DateTime)GetValue(i);    public IDataReader GetData(int i) => throw new NotSupportedException();    public int FieldCount => ModelToDataTable<TModel>.Columns.Count;    public object this[int i] => GetValue(i);    public object this[string name] => GetValue(GetOrdinal(name));    public void Close() => Dispose();    public DataTable GetSchemaTable() => ModelToDataTable<TModel>.ToDataTable(_source);    public bool Read() => NextResult();    public int Depth => _depth;    public bool IsClosed => _release;    public int RecordsAffected => 0;  }

模型对象转数据行工具类

/// <summary>  /// 对象转换成DataTable转换类  /// </summary>  /// <typeparam name="TModel">泛型类型</typeparam>  public static class ModelToDataTable<TModel>  {    static ModelToDataTable()    {      //如果需要剔除某些列可以修改这段代码      var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray();      Columns = new ReadOnlyCollection<DataColumn>(propertyList        .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray());      //生成对象转数据行委托      ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList);    }    /// <summary>    /// 构建转换成数据行委托    /// </summary>    /// <param name="type">传入类型</param>    /// <param name="propertyList">转换的属性</param>    /// <returns>转换数据行委托</returns>    private static Func<TModel, object[]> BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList)    {      var source = Expression.Parameter(type);      var items = propertyList.Select(property => ConvertBindPropertyToData(source, property));      var array = Expression.NewArrayInit(typeof(object), items);      var lambda = Expression.Lambda<Func<TModel, object[]>>(array, source);      return lambda.Compile();    }    /// <summary>    /// 将属性转换成数据    /// </summary>    /// <param name="source">源变量</param>    /// <param name="property">属性信息</param>    /// <returns>获取属性数据表达式</returns>    private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property)    {      var propertyType = property.PropertyType;      var expression = (Expression)Expression.Property(source, property);      if (propertyType.IsEnum)        expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType());      return Expression.Convert(expression, typeof(object));    }    /// <summary>    /// 获取数据类型    /// </summary>    /// <param name="type">属性类型</param>    /// <returns>数据类型</returns>    private static Type GetDataType(Type type)    {      //枚举默认转换成对应的值类型      if (type.IsEnum)        return type.GetEnumUnderlyingType();      //可空类型      if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))        return GetDataType(type.GetGenericArguments().First());      return type;    }    /// <summary>    /// 列集合    /// </summary>    public static IReadOnlyList<DataColumn> Columns { get; }    /// <summary>    /// 对象转数据行委托    /// </summary>    public static Func<TModel, object[]> ToRowData { get; }    /// <summary>    /// 集合转换成DataTable    /// </summary>    /// <param name="source">集合</param>    /// <param name="tableName">表名称</param>    /// <returns>转换完成的DataTable</returns>    public static DataTable ToDataTable(IEnumerable<TModel> source, string tableName = "TempTable")    {      //创建表对象      var table = new DataTable(tableName);      //设置列      foreach (var dataColumn in Columns)      {        table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType));      }      //循环转换每一行数据      foreach (var item in source)      {        table.Rows.Add(ToRowData.Invoke(item));      }      //返回表对象      return table;    }  }

三、测试封装代码

1.测试代码

创表代码

CREATE TABLE [dbo].[Person](
[Id] [BIGINT] NOT NULL,
[Name] [VARCHAR](64) NOT NULL,
[Age] [INT] NOT NULL,
[CreateTime] [DATETIME] NULL,
[Sex] [INT] NOT NULL,
PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

实体类代码

定义的实体的属性名称需要和SqlServer列名称类型对应

public class Person{  public long Id { get; set; }  public string Name { get; set; }  public int Age { get; set; }  public DateTime? CreateTime { get; set; }  public Gender Sex { get; set; }}public enum Gender{  Man = 0,  Woman = 1}

测试方法

//生成10万条数据var persons = new Person[100000];var random = new Random();for (int i = 0; i < persons.Length; i++){  persons[i] = new Person  {    Id = i + 1,    Name = "张三" + i,    Age = random.Next(1, 128),    Sex = (Gender)random.Next(2),    CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i)  };}//创建数据库连接using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")){  conn.Open();  var sw = Stopwatch.StartNew();  //批量插入数据  var qty = conn.BulkCopy(persons);  sw.Stop();  Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms");}

执行批量插入结果

226.4767ms
请按任意键继续. . .

四、代码下载

GitHub代码地址:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents


编辑:广州鸿名健康科技有限公司

标签:批量,数据,对象,字段,属性