くりーむわーかー

プログラムとか。作ってて ・試しててハマった事など。誰かのお役に立てば幸いかと。 その他、いろいろエトセトラ。。。

BulkCopy

C#でバルクインサート(BulkCopy) 非同期

以前、非同期でBulkCopyするのを書いた。それのちょっと追記版。

MSDN見てたら「WriteToServerAsync」っていうのがあった。っていうか普通に非同期用のメソッドあるんじゃん・・・。とゆーことでこっちを使って書き直す。

//using System;
//using System.Data;
//using System.Data.SqlClient;

string ConnectionString = @"****";
DataTable templateDt = new DataTable();//BulkCopy用のDataTable定義テンプレ

public void sample()
{
    /*サンプルテーブル
    create table HogeTable(
        intval int,
        str1 varchar(100),
        str2 varchar(100),
        str3 varchar(100),
        str4 varchar(100),
    )
    */

    //最初にDataTableの定義を作るのメンドーなので対象テーブルを空SELECTしてSqlDataReaderから定義を作る
    using (SqlConnection cn_ = new SqlConnection(ConnectionString))
    {
        cn_.Open();
        SqlCommand command = new SqlCommand();
        command.CommandText = "select top 0 * from [HogeTable]";
        command.Connection = cn_;
        using (SqlDataReader sqlDr = command.ExecuteReader())
        {
            DataTable schemaDt = sqlDr.GetSchemaTable();
            foreach (DataRow schemaDr in schemaDt.Rows)
            {
                string columnName = schemaDr["ColumnName"].ToString().Trim();
                string dataType = schemaDr["DataType"].ToString().Trim();
                DataColumn dc = new DataColumn();
                dc.ColumnName = columnName;
                dc.DataType = System.Type.GetType(dataType);
                templateDt.Columns.Add(dc);
            }
        }
    }
    Console.WriteLine("同期実行=====");
    Console.WriteLine("Start:{0}", DateTime.Now);
    for (int i = 0; i < 3; i++)//3回やる
    {
        DataTable copyFromDataTable = CreateTestData(500000);//50万件のテストデータ
        using (SqlConnection cn_ = new SqlConnection(ConnectionString))
        {
            cn_.Open();
            using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cn_))
            {
                bulkCopy.DestinationTableName = String.Format("dbo.[{0}]", "HogeTable");
                try
                {
                    bulkCopy.WriteToServer(copyFromDataTable);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
        Console.Write("*");//進捗的な
    }
    Console.WriteLine("");
    Console.WriteLine("End:{0}", DateTime.Now);

    Console.WriteLine("非同期実行=====");
    st = DateTime.Now;
    Console.WriteLine("Start:{0}", DateTime.Now);
    for (int i = 0; i < 3; i++)//3回やる
    {
        DataTable copyFromDataTable = CreateTestData(500000);//50万件のテストデータ
        AsyncSqlBulkCopy(copyFromDataTable, String.Format("dbo.[{0}]", "HogeTable"));
        Console.Write("*");//進捗的な
    }
    Console.WriteLine("");
    ed = DateTime.Now;
    Console.WriteLine("End:{0}", DateTime.Now);

}
//非同期実行用
public async void AsyncSqlBulkCopy(DataTable bulkFrom,string distTableName)
{
    using (SqlConnection conn = new SqlConnection(ConnectionString))
    {
        await conn.OpenAsync();
        using (SqlBulkCopy bcp = new SqlBulkCopy(conn))
        {
            bcp.DestinationTableName = distTableName;
            await bcp.WriteToServerAsync(bulkFrom);
        }
    }
}
//テストデータ作る
public DataTable CreateTestData(int count)
{
    DataTable copyFromDataTable = templateDt.Clone();
    string kyedatetime = DateTime.Now.ToString("yyyyMMddHHmmssfff");
    for (int i = 0; i < count; i++)
    {
        var addRow = copyFromDataTable.NewRow();
        addRow.BeginEdit();
        addRow[0] = i;
        addRow[1] = String.Format("str1_{0}", kyedatetime);
        addRow[2] = String.Format("str2_{0}", kyedatetime);
        addRow[3] = String.Format("str3_{0}", kyedatetime);
        addRow[4] = String.Format("str4_{0}", kyedatetime);
        addRow.EndEdit();
        copyFromDataTable.Rows.Add(addRow);
    }
    return copyFromDataTable;
}

非同期の方が同期実行時の半分くらいの時間だった。まぁデータが登録終わるまでの時間は変わんないけどネ。。。

ちなみにDestinationTableName にはちゃんと"[テーブル名]"の形でテーブル名を指定したほうが吉。[]で囲ってないとやられる場合がある故。

MSSQLServerからPostgreSQLへのデータ移行をC#で書く

タイトル通り。最近、SQLServerからPostgreSQLへの切り替えをやってる。CreateTableしてー、CSVにデータはいてー、Postgreにインポートしてー。みたいなのはググればいっぱい出てきますが、正直、
クッソだるい

何がだるいかとゆーと、ってか実際には上の感じの作業はしてないので、もしかしたらすんなり行くのかもしれませんが、十中八九、以下の点ではまるのが目に見えている。

  1. テキスト系で改行とかエスケープしないとダメなのあったらどーせ無理じゃない?
  2. 100を超えるテーブルのCreate文なんか流したくない
  3. 100を超えるテーブルのインポートなんかしたくない
  4. バイナリ型ってCSV吐いただけで行けるのかしら?
などなど。ハマルのが目に見えている。

とゆーわけで、PostgreSQLをC#でごにょごにょするのに慣れる意味も含めてプログラムでやった。

SQLServerへの接続とかはデフォでほぼ行けるからどーでもいいとして、PostgreSQLを扱う場合は「Npgsql」を使う。インストールはNugetから。

あと、一件づつインサートとかはありえないので、.netでいうBulkCopyもしたい。

プログラムの流れは↓の感じ。

①SQLServerで移行したいDBのテーブルの定義を丸ごと取得
②①の中でPostgre用のCreateTableを作っておく。
③Postgreに②のCreateTableを発行
④SQLServer側のテーブルをSelect *して、Postgreに順次BulkCopy
⑤Postgre側にIndexつける
⑥VACUUM実行

大したことはやってないけど、確実に手でやるより早い。

①のSQLServerの定義取得は↓の感じのクエリで丸ごととってくる

select 
X.object_id
,X.name as tablename
,cast(Y.column_id as int) as column_id
,Y.name as colname
,cast(Y.system_type_id as int) as system_type_id
,TYPE_NAME(Y.system_type_id) as typename
,cast(Y.max_length as int) as max_length
,cast(Y.precision as int) as precision
,cast(Y.scale as int) as scale
,cast(Y.is_nullable as int) as is_nullable
,cast(Y.is_identity as int) as is_identity 
,cast(isnull(Z.is_primary_key,0) as int) as is_primary_key
from (
	select * from sys.tables
	where type = 'U' 
	and name not like '%[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]%'
) as X
inner join (
	select * from sys.columns
) as Y
on X.object_id = Y.object_id
left join (
	select X.object_id,Y.column_id,Z.is_primary_key from sys.columns as X
	inner join sys.index_columns as Y
	on X.object_id = Y.object_id
	and X.column_id = Y.column_id
	inner join sys.indexes as Z
	on Y.object_id = Z.object_id
	and Y.index_id = Z.index_id
	and X.object_id = Z.object_id
	where Z.is_primary_key = 1
) as Z
on Y.object_id = Z.object_id
and Y.column_id = Z.column_id
order by X.object_id,Y.column_id

主キーの判定がちょっとメンドー。あと、テーブル名で西暦8ケタ付きのバックアップぽいテーブルは除くようにしてみたりしてる。あと、数値系の項目ってどれがbyte型か調べるのめんどーになったので、全部強制的にIntに変換。ランボー過ぎかな?

定義が取れれば、Create文はテキトーに作れるでしょう。

そしたら、Postgre側にテーブル作成を投げる。下の感じ。公式のままだ。

//EF使ってるけど、クエリを発行したいのでEFの接続文字列を無理やり取得
string constr= this.Database.Connection.ConnectionString;
using (var conn = new NpgsqlConnection(constr))
{
    conn.Open();
    using (var cmd = new NpgsqlCommand())
    {
        cmd.Connection = conn;
        try
        {
            //何回か流すようにテーブルドロップ。初回は落ちるのでtryしておく。これもランボー。
            cmd.CommandText = String.Format("DROP TABLE dbo.\"{0}\";",pDef.tablename);
            cmd.ExecuteNonQuery();
        }
        catch { }
        cmd.CommandText = pDef.createSql;
        cmd.ExecuteNonQuery();
    }
}

そしたら次はBulkCopy。ロジックは抜粋で。

using (SqlConnection cn_ = new SqlConnection(ConnectionString))
{
    cn_.Open();

    SqlCommand command = new SqlCommand();
    command.CommandTimeout = 3600;//タイムアウトの設定
    command.CommandText = "select * from [テーブル名]";
    command.Connection = cn_;
    //SQLServerにSelect投げる
    using (SqlDataReader sqlDr = command.ExecuteReader())
    {
        DataTable schemaDt = sqlDr.GetSchemaTable();
        //Postgreにつなぐ
        using (var conn = new NpgsqlConnection(pgConStr))
        {
            conn.Open();
            //Postgreのbulk
            using (var writer = conn.BeginBinaryImport(String.Format("COPY dbo.\"{0}\" ({1}) FROM STDIN (FORMAT BINARY)", "テーブル名","テーブルの列のリスト")))
            {
                while (sqlDr.Read())//SQLServerのSelect結果
                {
                    writer.StartRow();//コピーする行ごとに必要っぽい
                    for(int i=0;i< schemaDt.Columns.Count; i++)
                    {
                        //列毎にカキカキ
                        writer.Write(sqlDr[i],getColDef(pDef, i));
                    }
                }
            }
        }
    }
}
//SQLServerのデータ型に応じてPostgre側の型定義を返す
public NpgsqlDbType getColDef(MSDBTableDef pDef, int i)
{

    switch (pDef.coldeflist[i].system_type_id)
    {
        case 56://int
            return NpgsqlDbType.Integer;
        case 127://bigint
            return NpgsqlDbType.Bigint;
        case 167://varchar
        case 231://nvarchar
            if(pDef.coldeflist[i].max_length < 0)
                return NpgsqlDbType.Text;
            else
                return NpgsqlDbType.Varchar;
        case 106://decimal
            return NpgsqlDbType.Numeric;
        case 61://datetime
            return NpgsqlDbType.Timestamp;
        case 165://varbinary
            return NpgsqlDbType.Bytea;
    }
    return NpgsqlDbType.Varchar;
}

Npgsqlのバルクコピーは↓の感じで、Objectの配列として渡してもOKな気がする。

object[] dtArray = new object[schemaDt.Columns.Count];
sqlDr.GetValues(dtArray);
writer.WriteRow(dtArray);

ただ、公式のこの辺に「NpgsqlDbTypeでデータの型をちゃんと指定する事を激しくお勧めする」って書いてあるのでそーした。データ型の指定はいつの時代になってもとっても重要。

あと、インデックスはデータ入れ終わってから、まとめてやった方が多分よいと思う。速度的な意味で。

最後に、対象のDBに対して「VACUUM」ってコマンドを投げてあげる。不要領域の削除とかなんかもろもろやるらし。テーブルロックとかされる場合もあるらしいので、誰か触ってる時にはやらない方がいい。

でだ、NpgsqlのCopyはいいんだけど、Writeのタイミングで書きに行ってるわけじゃないよね?なんだかいつ実行してるのかロジック見る感じでよくわからない。多分、usingしてるし、Disposeされるときにやってるのかなと愚考。個人的に明示してくれる方がわかりやすい。

C#でバルクインサート

大量のデータを出来るだけ高速にテーブルに入れたい時の性能的な比較メモ。

試しの環境

DBMS:SQLServer2012
DB機: CPU:2.6G(1コア) RAM:8GB
PG動かした端末 : CPU:2.6G(4コア) RAM:8GB

試しに使ったテーブルは下の定義

create table TEST01(
test01 int,
test02 int,
test03 varchar(100),
test04 varchar(100)
)

最初のためしは1万件のデータをインサート。①1件毎にインサート文を発行、②StringBuilderにインサート文をためて1000件毎に発行。③BulkCopyを使って1000件毎に実行。で下が結果。

①Insertを1件毎
98.32秒
②StringBuilderにInsertをためて、1000件たまったら実行
15.34秒
③Bulkを使って1000件たまったら実行
0.57秒

Bulkが圧倒的でした。この辺りは割とネットでもよく見かける。それじゃつまんないので、前々から非同期で動かしてみたかったので、次はBulkをTaskを使って非同期実行した場合の結果。100万件のデータを10万件毎にBulkCopy。

①Bulkを使って10万件たまったら実行
15.36秒
②Bulkを使って10万件たまったら非同期実行
PG上の戻りは1.8秒
DB上で全件登録されるのが7秒くらい

非同期実行かなり早い。やっぱ制御をさっさと戻したい場合は非同期がいいよね。ただ、やっぱ非同期なので、制御が戻ってきても、DB上の登録が終わってない。PG側が終わるのは2秒くらいだけど、DB側で全件登録終わるのは7秒くらいかかってた。

ついでに、やっぱ非同期なので、データが登録される順番はランダムになるっぽい。順番にインサートする必要があるなら使っちゃダメね。あと、今回はDataTableにデータを入れてBulkCopy実行したけど、実行中はその分のメモリがPGに必要になるので大規模すぎるデータでやる場合は同時に実行するTaskの数に制限をかけたりといろいろ考えないとだめかな。まぁ、当然の話です。

でも、BulkCopyを実行している間に次のデータの読み込みが出来るのはいい感じ。

非同期実行でBulkCopyするサンプルは↓

//バルクコピー本体
public void BulkInsertProc(string targetTable, DataTable wkDt)
{
    string ConnectionString = GetConnectionString();//接続文字列
    using (SqlConnection cn_ = new SqlConnection(ConnectionString))
    {
        try
        {
            SqlBulkCopy bc = new SqlBulkCopy(cn_);
            bc.BulkCopyTimeout = 3600;//適当に1時間
            bc.DestinationTableName = "[" + targetTable + "]";
            cn_.Open();
            bc.WriteToServer(wkDt);
        }
        catch (Exception e)
        {
            throw e;
        }
    }
}
//↑をTaskで呼び出す。
public async void DoBulkAsync(string wkTableName, DataTable wkBulkDt)
{
    await Task.Run(() =>
    {
        BulkInsertProc(wkTableName, wkBulkDt);
    });
}

ちなみにDataTableは定義を作るのが面倒なのでSQLで適当にSELECTしてその結果を使う。↓の感じ。

//SQL発行して結果をDataTableで戻す関数
public DataTable ExecSql(string p_query)
{
    string ConnectionString = GetConnectionString();//接続文字列
    DataTable wkDt = new DataTable();
    try
    {
        using (SqlConnection cn_ = new SqlConnection(ConnectionString))
        {
            cn_.Open();
            SqlCommand command = new SqlCommand();
            command.CommandTimeout = 3600;//タイムアウトの設定
            command.CommandText = p_query.ToString();
            command.Connection = cn_;
            using (SqlDataReader sqlDr = command.ExecuteReader())
            {
                //SQL発行結果の編集
                GetSchemaDataTable(sqlDr, wkDt);//DataTableの定義をSqlDataReaderから作る
                while (sqlDr.Read())
                {
                    DataRow nr = wkDt.NewRow();
                    object[] dtArray = new object[wkDt.Columns.Count];
                    sqlDr.GetValues(dtArray);
                    nr.ItemArray = dtArray;
                    wkDt.Rows.Add(nr);
                }
                wkDt.AcceptChanges();
            }
        }
    }
    catch
    { }
    finally
    { }
    return wkDt;
}
//DataTableの定義をSqlDataReaderから作る関数
private void GetSchemaDataTable(SqlDataReader sqlDr, DataTable wkDt)
{
    DataTable schemaDt = sqlDr.GetSchemaTable();
    foreach (DataRow schemaDr in schemaDt.Rows)
    {
        string columnName = schemaDr["ColumnName"].ToString().Trim();
        string dataType = schemaDr["DataType"].ToString().Trim();
        DataColumn dc = new DataColumn();
        dc.ColumnName = columnName;
        dc.DataType = System.Type.GetType(dataType);
        wkDt.Columns.Add(dc);
    }
}

もっと美しく書けると思うけれども、、、やっつけ。

後は、↑で取ってきた定義のDataTableをテンプレとして持っておいて、 必要な時にCloneでオブジェクト作る。ただの代入だと、参照になって、元も変更されてえらいこっちゃになる。 下の感じ。

DataTable bulkDt = ExecSql("SELECT * FROM TEST01");
DataTable bulkTmpDt = bulkDt.Clone();
//一行追加
var addRow = bulkTmpDt.NewRow();
addRow.BeginEdit();
addRow["test01"] = 1;
addRow["test02"] = 2;
addRow["test03"] = "TEST1";
addRow["test04"] = "TEST2";
addRow.EndEdit();
bulkTmpDt.Rows.Add(addRow);
//BulkCopyの実行
DoBulkAsync("TEST01", bulkTmpDt);

そーいえば、BulkCopyって.Net2.0から使えるんですね・・・。もっと最近のものだとばかり思っていた。なんか損した気分。。。

続きを読む
問合せ