くりーむわーかー

プログラムとか。作ってて ・試しててハマった事など。誰かのお役に立てば幸いかと。 その他、いろいろエトセトラ。。。

バルクインサート

C#でバルクインサート(BulkCopy) 非同期

以前、非同期でBulkCopyするのを書いた。それのちょっと追記版。

MSDN見てたら「WriteToServerAsync」っていうのがあった。っていうか普通に非同期用のメソッドあるんじゃん・・・。とゆーことでこっちを使って書き直す。

//using System;
//using System.Data;
//using System.Data.SqlClient;

string ConnectionString = @"****";
DataTable templateDt = new DataTable();//BulkCopy用のDataTable定義テンプレ

public void sample()
{
    /*サンプルテーブル
    create table HogeTable(
        intval int,
        str1 varchar(100),
        str2 varchar(100),
        str3 varchar(100),
        str4 varchar(100),
    )
    */

    //最初にDataTableの定義を作るのメンドーなので対象テーブルを空SELECTしてSqlDataReaderから定義を作る
    using (SqlConnection cn_ = new SqlConnection(ConnectionString))
    {
        cn_.Open();
        SqlCommand command = new SqlCommand();
        command.CommandText = "select top 0 * from [HogeTable]";
        command.Connection = cn_;
        using (SqlDataReader sqlDr = command.ExecuteReader())
        {
            DataTable schemaDt = sqlDr.GetSchemaTable();
            foreach (DataRow schemaDr in schemaDt.Rows)
            {
                string columnName = schemaDr["ColumnName"].ToString().Trim();
                string dataType = schemaDr["DataType"].ToString().Trim();
                DataColumn dc = new DataColumn();
                dc.ColumnName = columnName;
                dc.DataType = System.Type.GetType(dataType);
                templateDt.Columns.Add(dc);
            }
        }
    }
    Console.WriteLine("同期実行=====");
    Console.WriteLine("Start:{0}", DateTime.Now);
    for (int i = 0; i < 3; i++)//3回やる
    {
        DataTable copyFromDataTable = CreateTestData(500000);//50万件のテストデータ
        using (SqlConnection cn_ = new SqlConnection(ConnectionString))
        {
            cn_.Open();
            using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cn_))
            {
                bulkCopy.DestinationTableName = String.Format("dbo.[{0}]", "HogeTable");
                try
                {
                    bulkCopy.WriteToServer(copyFromDataTable);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
        Console.Write("*");//進捗的な
    }
    Console.WriteLine("");
    Console.WriteLine("End:{0}", DateTime.Now);

    Console.WriteLine("非同期実行=====");
    st = DateTime.Now;
    Console.WriteLine("Start:{0}", DateTime.Now);
    for (int i = 0; i < 3; i++)//3回やる
    {
        DataTable copyFromDataTable = CreateTestData(500000);//50万件のテストデータ
        AsyncSqlBulkCopy(copyFromDataTable, String.Format("dbo.[{0}]", "HogeTable"));
        Console.Write("*");//進捗的な
    }
    Console.WriteLine("");
    ed = DateTime.Now;
    Console.WriteLine("End:{0}", DateTime.Now);

}
//非同期実行用
public async void AsyncSqlBulkCopy(DataTable bulkFrom,string distTableName)
{
    using (SqlConnection conn = new SqlConnection(ConnectionString))
    {
        await conn.OpenAsync();
        using (SqlBulkCopy bcp = new SqlBulkCopy(conn))
        {
            bcp.DestinationTableName = distTableName;
            await bcp.WriteToServerAsync(bulkFrom);
        }
    }
}
//テストデータ作る
public DataTable CreateTestData(int count)
{
    DataTable copyFromDataTable = templateDt.Clone();
    string kyedatetime = DateTime.Now.ToString("yyyyMMddHHmmssfff");
    for (int i = 0; i < count; i++)
    {
        var addRow = copyFromDataTable.NewRow();
        addRow.BeginEdit();
        addRow[0] = i;
        addRow[1] = String.Format("str1_{0}", kyedatetime);
        addRow[2] = String.Format("str2_{0}", kyedatetime);
        addRow[3] = String.Format("str3_{0}", kyedatetime);
        addRow[4] = String.Format("str4_{0}", kyedatetime);
        addRow.EndEdit();
        copyFromDataTable.Rows.Add(addRow);
    }
    return copyFromDataTable;
}

非同期の方が同期実行時の半分くらいの時間だった。まぁデータが登録終わるまでの時間は変わんないけどネ。。。

ちなみにDestinationTableName にはちゃんと"[テーブル名]"の形でテーブル名を指定したほうが吉。[]で囲ってないとやられる場合がある故。

C#でバルクインサート

大量のデータを出来るだけ高速にテーブルに入れたい時の性能的な比較メモ。

試しの環境

DBMS:SQLServer2012
DB機: CPU:2.6G(1コア) RAM:8GB
PG動かした端末 : CPU:2.6G(4コア) RAM:8GB

試しに使ったテーブルは下の定義

create table TEST01(
test01 int,
test02 int,
test03 varchar(100),
test04 varchar(100)
)

最初のためしは1万件のデータをインサート。①1件毎にインサート文を発行、②StringBuilderにインサート文をためて1000件毎に発行。③BulkCopyを使って1000件毎に実行。で下が結果。

①Insertを1件毎
98.32秒
②StringBuilderにInsertをためて、1000件たまったら実行
15.34秒
③Bulkを使って1000件たまったら実行
0.57秒

Bulkが圧倒的でした。この辺りは割とネットでもよく見かける。それじゃつまんないので、前々から非同期で動かしてみたかったので、次はBulkをTaskを使って非同期実行した場合の結果。100万件のデータを10万件毎にBulkCopy。

①Bulkを使って10万件たまったら実行
15.36秒
②Bulkを使って10万件たまったら非同期実行
PG上の戻りは1.8秒
DB上で全件登録されるのが7秒くらい

非同期実行かなり早い。やっぱ制御をさっさと戻したい場合は非同期がいいよね。ただ、やっぱ非同期なので、制御が戻ってきても、DB上の登録が終わってない。PG側が終わるのは2秒くらいだけど、DB側で全件登録終わるのは7秒くらいかかってた。

ついでに、やっぱ非同期なので、データが登録される順番はランダムになるっぽい。順番にインサートする必要があるなら使っちゃダメね。あと、今回はDataTableにデータを入れてBulkCopy実行したけど、実行中はその分のメモリがPGに必要になるので大規模すぎるデータでやる場合は同時に実行するTaskの数に制限をかけたりといろいろ考えないとだめかな。まぁ、当然の話です。

でも、BulkCopyを実行している間に次のデータの読み込みが出来るのはいい感じ。

非同期実行でBulkCopyするサンプルは↓

//バルクコピー本体
public void BulkInsertProc(string targetTable, DataTable wkDt)
{
    string ConnectionString = GetConnectionString();//接続文字列
    using (SqlConnection cn_ = new SqlConnection(ConnectionString))
    {
        try
        {
            SqlBulkCopy bc = new SqlBulkCopy(cn_);
            bc.BulkCopyTimeout = 3600;//適当に1時間
            bc.DestinationTableName = "[" + targetTable + "]";
            cn_.Open();
            bc.WriteToServer(wkDt);
        }
        catch (Exception e)
        {
            throw e;
        }
    }
}
//↑をTaskで呼び出す。
public async void DoBulkAsync(string wkTableName, DataTable wkBulkDt)
{
    await Task.Run(() =>
    {
        BulkInsertProc(wkTableName, wkBulkDt);
    });
}

ちなみにDataTableは定義を作るのが面倒なのでSQLで適当にSELECTしてその結果を使う。↓の感じ。

//SQL発行して結果をDataTableで戻す関数
public DataTable ExecSql(string p_query)
{
    string ConnectionString = GetConnectionString();//接続文字列
    DataTable wkDt = new DataTable();
    try
    {
        using (SqlConnection cn_ = new SqlConnection(ConnectionString))
        {
            cn_.Open();
            SqlCommand command = new SqlCommand();
            command.CommandTimeout = 3600;//タイムアウトの設定
            command.CommandText = p_query.ToString();
            command.Connection = cn_;
            using (SqlDataReader sqlDr = command.ExecuteReader())
            {
                //SQL発行結果の編集
                GetSchemaDataTable(sqlDr, wkDt);//DataTableの定義をSqlDataReaderから作る
                while (sqlDr.Read())
                {
                    DataRow nr = wkDt.NewRow();
                    object[] dtArray = new object[wkDt.Columns.Count];
                    sqlDr.GetValues(dtArray);
                    nr.ItemArray = dtArray;
                    wkDt.Rows.Add(nr);
                }
                wkDt.AcceptChanges();
            }
        }
    }
    catch
    { }
    finally
    { }
    return wkDt;
}
//DataTableの定義をSqlDataReaderから作る関数
private void GetSchemaDataTable(SqlDataReader sqlDr, DataTable wkDt)
{
    DataTable schemaDt = sqlDr.GetSchemaTable();
    foreach (DataRow schemaDr in schemaDt.Rows)
    {
        string columnName = schemaDr["ColumnName"].ToString().Trim();
        string dataType = schemaDr["DataType"].ToString().Trim();
        DataColumn dc = new DataColumn();
        dc.ColumnName = columnName;
        dc.DataType = System.Type.GetType(dataType);
        wkDt.Columns.Add(dc);
    }
}

もっと美しく書けると思うけれども、、、やっつけ。

後は、↑で取ってきた定義のDataTableをテンプレとして持っておいて、 必要な時にCloneでオブジェクト作る。ただの代入だと、参照になって、元も変更されてえらいこっちゃになる。 下の感じ。

DataTable bulkDt = ExecSql("SELECT * FROM TEST01");
DataTable bulkTmpDt = bulkDt.Clone();
//一行追加
var addRow = bulkTmpDt.NewRow();
addRow.BeginEdit();
addRow["test01"] = 1;
addRow["test02"] = 2;
addRow["test03"] = "TEST1";
addRow["test04"] = "TEST2";
addRow.EndEdit();
bulkTmpDt.Rows.Add(addRow);
//BulkCopyの実行
DoBulkAsync("TEST01", bulkTmpDt);

そーいえば、BulkCopyって.Net2.0から使えるんですね・・・。もっと最近のものだとばかり思っていた。なんか損した気分。。。

続きを読む
問合せ