noxi雑記

.NET、Angularまわりの小ネタブログ

.NET CoreとSqlBulkCopy

前回の記事で扱ってみたのですが、 .NET には SqlBulkCopy という SQLServer に対して BulkInsert を実行するためのクラスがあります。注釈にはこう記述されています。

Microsoft SQL Server には、1台のサーバーでもサーバー間でも、あるテーブルから別のテーブルにデータを移動するためのbcpという一般的なコマンドプロンプトユーティリティが含まれています。 SqlBulkCopy クラスを使用すると、同様の機能を提供するマネージコードソリューションを作成できます。 SQL Server テーブル (INSERT ステートメントなど) にデータを読み込む方法は他にもありますが、SqlBulkCopy にはパフォーマンス上の大きな利点があります。

SqlBulkCopy クラスは、SQL Server テーブルのみにデータを書き込む場合に使用できます。 ただし、データソースは SQL Server に限定されません。データを DataTable インスタンスに読み込んだり、IDataReader インスタンスで読み取ることができる限り、任意のデータソースを使用できます。

docs.microsoft.com

使用してみると分かりますがデータの転送時間しかかかっていないんじゃ無かろうか、という程度には高速です。次回使用する時用の使用メモを残します。



NuGet パッケージ参照

.NET Core 環境で SqlBulkCopy を使用するには NuGet からパッケージを取得する必要があります。 System.Data.SqlClient または Microsoft.Data.SqlClient が必要です。 System.Data.SqlClientMicrosoft.Data.SqlClient に置き換えられていく運命のため、特に必要が無い限りは Microsoft.Data.SqlClient を使用します。

devblogs.microsoft.com

SqlBulkCopy 使い方

続いて簡単な使い方です。データを転送する方法は大きく2つに別れます。1つは DataReader を使う方法、もう1つは DataTable を使う方法です。両者ともあまり使ったことが無いのですが、前者がデータベースに対する Cursor 、後者がメモリ上のテーブル だと思ってます。両方とも使用してみてその操作感を試してみます。

DataReader を使用する

まずは DataReader を使用するパターンです。これは別に立っている SQLServer やそれ以外のデータベースがデータソースになっている時に使用できます。同一 SQLServer 内のコピーであれば普通に INSERT - SELECT を使いましょう。
今回は MySQL のテーブルデータ10万件をデータソースとして、 SQLServer にコピーしてみます。なお MySQL へ接続するため別途 MySqlConnector を NuGet から参照しています。

using System;
using Microsoft.Data.SqlClient;
using MySql.Data.MySqlClient;

public static class CopyTask
{
    public static void Run()
    {
        // 1. Connection作成
        using (MySqlConnection conn1 = new MySqlConnection("Server=......"))
        using (SqlConnection conn2 = new SqlConnection("Server=......"))
        {
            // 2. MySQLからデータをSELECTするコマンドの作成
            MySqlCommand command = new MySqlCommand("SELECT `Id`, `Name`, `Age`, `Date` FROM `Sample`;", conn1)
            {
                CommandTimeout = (int) TimeSpan.FromHours(1).TotalSeconds
            };

            conn1.Open();
            conn2.Open();

            // 3. コマンドからDataReaderの作成
            using (MySqlDataReader reader = command.ExecuteReader())
            // 4. SqlBulkCopyの作成
            using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(conn2)
            {
                BulkCopyTimeout = (int) TimeSpan.FromHours(1).TotalSeconds,
                DestinationTableName = "Sample",
                EnableStreaming = true,
                NotifyAfter = 1000
            })
            {
                // 5. NotifyAfterで指定した件数分転送されたことを受け取るイベントハンドル
                sqlBulkCopy.SqlRowsCopied += (sender, eventArgs) =>
                {
                    Console.WriteLine("Copied rows {0}", eventArgs.RowsCopied);
                };

                // 6. MySQLからSQLServerへのデータ転送
                sqlBulkCopy.WriteToServer(reader);
            }
        }
    }
}

やっていることはとてもシンプルで、 MySQL から SELECT で取得したデータをそのまま SQLServer へ流しているだけです。テーブル単位にはなりますがとても高速で、同一 Azure リージョン内に配置した Azure Database For MySQL (Basic 1vCore) から SQL Database (Serverless 1vCore) に対して10万行転送するのに10秒かかりませんでした。 C# コードの実行は Azure Functions (従量課金) です。1行あたりのデータ量がとても軽いというのもあったでしょうが、スキーマが同じであればサクッとこれだけでデータコピー出来るのは素敵です。

SqlBulkCopy の NotifyAfter プロパティを設定しておくと指定した行数が SQLServer へ転送される毎に SqlRowsCopied イベントが呼び出されます。ログ出力に利用すると良いでしょう。

DataTable を使用する

続いてデータソースが DataTable のパターンです。こちらは SQLServer へ転送するデータがデータベース以外から取得する際に使用できます。私はこちらのパターンを WebAPI (OData) から取得した JSON データを転送するのに使用しました。

using System;
using System.Data;
using Microsoft.Data.SqlClient;

public static class CopyTask
{
    public static void Run()
    {
        // 1. DataTable作成とカラムの定義
        DataTable dataTable = new DataTable();
        dataTable.Columns.Add(new DataColumn("Id", typeof(int)));
        dataTable.Columns.Add(new DataColumn("Name", typeof(string)));
        dataTable.Columns.Add(new DataColumn("Age", typeof(int)));
        dataTable.Columns.Add(new DataColumn("Data", typeof(DateTime)));

        // 2. DataTableに転送するデータを格納
        for (int i = 0; i < 100000; i++)
        {
            DataRow row = dataTable.NewRow();
            row["Id"] = i;
            row["Name"] = $"Name Name {i}";
            row["Age"] = i;
            row["Date"] = new DateTime(2000, 1, 1).AddHours(i);

            dataTable.Rows.Add(row);
        }

        // 3. SqlBulkCopyの作成
        using (SqlConnection conn = new SqlConnection(ConnectionStringSqlServer))
        using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(conn)
        {
            BulkCopyTimeout = (int) TimeSpan.FromHours(1).TotalSeconds,
            DestinationTableName = "Sample",
            EnableStreaming = false,
            NotifyAfter = 1000
        })
        {
            // 4. NotifyAfterで指定した件数分転送されたことを受け取るイベントハンドル
            sqlBulkCopy.SqlRowsCopied += (sender, eventArgs) =>
            {
                Console.WriteLine("Copied rows {0}", eventArgs.RowsCopied);
            };

            // 5. SQLServerへのデータ転送
            sqlBulkCopy.WriteToServer(dataTable);
        }

    }
}

こちらもやっていることはとてもシンプルで、 DataTable としてテーブル定義を作成してデータを格納、 SQLServer へ転送を実行します。もしかしたら DataTable をストリームとして使用し全データをメモリ上に保持する必要が無い方法があるかもしれませんが私には分かりませんでした。