C#でDynamoDBをバルクで取得・更新する

DynamoDBをバッチなどでアクセスすることを想定して、今回はDynamoDBのデータをバルクで取得・更新させてみます。

BatGetとBatchWrite

DynamoDBをバルクで取得・更新する場合、C#の永続性モデルではDynamoDBContextクラスで提供されるBatchGetおよびBatchWriteを使います。

docs.aws.amazon.com

// バルクで取得する場合
using (var dbContext = new DynamoDBContext(DbClient))
{
    // DynamoDBContextでBatchGetを作成して、
    var batchGet = dbContext.CreateBatchGet<BulkTestTableItem>();
    for (var j = 0; j < recordCount; j++)
    {
        // 取得するアイテムのキーをセットして、
        batchGet.AddKey(j.ToString());
    }
    // ExecuteAsyncで非同期に取得する
    batchGet.ExecuteAsync().Wait();
    var items = batchGet.Results;
}
// バルクで更新する場合
using (var dbContext = new DynamoDBContext(DbClient))
{
    // DynamoDBContextでBatchWriteを作成して、
    var batchWrite = dbContext.CreateBatchWrite<BulkTestTableItem>();
    // アイテムを追加して、
    batchWrite.AddPutItems(items);
    // ExecuteAsyncで非同期に更新する
    batchWrite.ExecuteAsync().Wait();
}

実例と性能測定

実例としてDynamoDBをバルクで取得して、カウンタをインクリメントして、バルクで更新するLamdaを作成します。
また、1件ずつ取得・更新する場合と比較して、どの程度速くなるのかを計測してみます。

DynamoDBテーブルの作成

テスト用に、文字列型のパーティションキーIdのみが設定されたシンプルなDynamoDBテーブル(bulk-test-table)を作成しました。
DynamoDBの読み書きがボトルネックとなることを防ぐため、今回は読み込み・書き込みのキャパシティを100に設定しています。

f:id:ohke:20170309225756j:plain

Lambdaの実装

2パターンで取得・更新する、性能測定用のLambdaを作成します。

  1. 1アイテムずつ取得・更新
  2. 全アイテムをバルクで取得・更新

引数(input)には1回で取得・更新するアイテム数と、試行回数を指定します。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.Lambda.Core;

[assembly: LambdaSerializerAttribute(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace DynamoReplicationLambda
{
    public class Function
    {
        private static readonly AmazonDynamoDBClient DbClient = new AmazonDynamoDBClient(RegionEndpoint.USWest2);

        // inputは1試行で取得・更新するアイテム数と試行数を指定します("100 10"の場合は、100アイテムの取得・更新を10回試行する)
        public void FunctionHandler(string input, ILambdaContext context)
        {
            var sw1 = new Stopwatch();
            var sw2 = new Stopwatch();

            var recordCount = int.Parse(input.Split(' ')[0]);
            Console.WriteLine("Record count: " + recordCount);

            var tryCount = int.Parse(input.Split(' ')[1]);
            Console.WriteLine("Try count: " + tryCount);

            // 初期化用のデータの投入
            using (var dbContext = new DynamoDBContext(DbClient))
            {
                for (int i = 0; i < recordCount; i++)
                {
                    dbContext.SaveAsync(new BulkTestTableItem
                    {
                        Id = i.ToString(),
                        Count = 0
                    }).Wait();
                }
            }

            // ①1アイテムずつ取得・更新(同期版)
            using (var dbContext = new DynamoDBContext(DbClient))
            {
                sw1.Start();
                for (var i = 0; i < tryCount; i++)
                {
                    for (var j = 0; j < recordCount; j++)
                    {
                        var task = dbContext.LoadAsync<BulkTestTableItem>(hashKey: j.ToString());
                        task.Wait();
                        var item = task.Result;

                        item.Count++;

                        dbContext.SaveAsync(item).Wait();
                    }
                }
                sw1.Stop();
            }

            // ②全アイテムをバルクで取得・更新
            using (var dbContext = new DynamoDBContext(DbClient))
            {
                sw2.Start();
                for (var i = 0; i < tryCount; i++)
                {
                    var batchGet = dbContext.CreateBatchGet<BulkTestTableItem>();
                    for (var j = 0; j < recordCount; j++)
                    {
                        batchGet.AddKey(j.ToString());
                    }
                    batchGet.ExecuteAsync().Wait();
                    var items = batchGet.Results;

                    items = items.Select(item => new BulkTestTableItem {Id = item.Id, Count = item.Count + 1}).ToList();

                    var batchWrite = dbContext.CreateBatchWrite<BulkTestTableItem>();
                    batchWrite.AddPutItems(items);
                    batchWrite.ExecuteAsync().Wait();
                }
                sw2.Stop();
            }

            Console.WriteLine($"①実行時間[ms]: {sw1.ElapsedMilliseconds}");
            Console.WriteLine($"②実行時間[ms]: {sw2.ElapsedMilliseconds}");
        }
    }

    [DynamoDBTable("bulk-test-table")]
    internal class BulkTestTableItem
    {
        [DynamoDBHashKey]
        [DynamoDBProperty(AttributeName = "Id")]
        public string Id { get; set; }
        
        [DynamoDBProperty(AttributeName = "Count")]
        public int Count { get; set; }
    }
}

測定結果

1アイテムずつ1000回取得・更新した場合と、100アイテムずつ10回バルク取得・更新するのにかかった処理時間をそれぞれ示します。
見ての通り、バルクで取得・更新するほうが約23倍高速になっており、効果は大きいことがわかるかと思います。

処理時間[ms]
1アイテムずつ1000アイテムを取得・更新 20678
100アイテムを10回バルクで取得・更新 886

NpgsqlとEntityFramework Coreを使ってPostgreSQLをCRUDする

C#で書かれた.NET CoreアプリケーションからPostgreSQLのデータを操作する機会があり、NpgsqlとEntity Framework Core(EF Core)の使い方を調べました。

Npgsql

NpgsqlPostgreSQL用のADO.NETデータプロバイダで、C#VB.NETからPostgreSQLに接続してSQLを実行するためのライブラリです。
NpgsqlではEF Core用のプロバイダを提供しています。
Npgsql - .NET Access to PostgreSQL | Npgsql Documentation

DBの構築

今回はDocker HubのPostgreSQLコンテナを使ってテスト用のDB環境を作りました。
https://hub.docker.com/_/postgres/

テスト用にmembersテーブルとtodosテーブルを作成します。
membersとtodosは1:Nのリレーションを持っており、membersのidカラムが外部キーとなっています。

members

カラム名 制約
id varchar(32) primary key
name varchar(32) not null
create table members (
    id varchar(32) primary key,
    name varchar(32) not null
);

todos

カラム名 制約
id serial primary key
member_id int foreign key
content varchar(256) not null
due date
done boolean
create table todos (
    id serial primary key,
    member_id varchar(32),
    content varchar(256) not null,
    due date,
    done boolean,
    foreign key (member_id) references members (id)
);

Npgsqlのインストール

project.jsonにNpgsql.EntityFrameworkCore.PostgreSQLの1.1.0(現行最新版)を追記してdotnet restoreでインストールします。

https://github.com/npgsql/Npgsql.EntityFrameworkCore.PostgreSQL

{
  "version": "1.0.0-*",
  "buildOptions": {
    "debugType": "portable",
    "emitEntryPoint": true
  },
  "dependencies": {
    "Npgsql.EntityFrameworkCore.PostgreSQL": "1.1.0"
  },
  "frameworks": {
    "netcoreapp1.1": {
      "dependencies": {
        "Microsoft.NETCore.App": {
          "type": "platform",
          "version": "1.1.0"
        }
      },
      "imports": "dnxcore50"
    }
  }
}

エンティティクラス

アプリケーションの実装に入っていきます。

最初にテーブルレコードとマッピングされるエンティティクラスを作成します。
ここではmembersテーブルレコードに対応するMemberクラス、todosテーブルレコードに対応するTodoクラスを定義しています。

using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace ConsoleApplication.Models
{
    [Table("members")]
    public class Member
    {
        [Key]
        [Column("id")]
        public string Id { get; set; }

        [Required]
        [Column("name")]
        public string Name { get; set; }

        public virtual ICollection<Todo> Todos { get; set; }
    }

    [Table("todos")]
    public class Todo
    {
        [Key]
        [Column("id")]
        public int Id { get; set; }

        [Required]
        [Column("member_id")]
        public string MemberId { get; set; }

        [Required]
        [Column("content")]
        public string Content { get; set; }

        [Column("due")]
        public DateTime Due { get; set; }

        [Column("done")]
        public bool Done { get; set; }

        public virtual Member Member { get; set; }
    }
}

DBコンテキストクラス

次にDbContextをオーバライドして、今回アクセスするtestデータベース用のDBコンテキストを作ります。
EF CoreではDbContextOptionsBuilderオブジェクトを介して接続文字列やロガーなどの設定をします。 今回はOnConfiguringで先程作成したローカルDBへ接続しています。

using ConsoleApplication.Models;
using Microsoft.EntityFrameworkCore;

namespace ConsoleApplication.Database
{
    public class TestDbContext : DbContext
    {
        public DbSet<Member> Members { get; set; }
        public DbSet<Todo> Todos { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseNpgsql("Host=localhost;Username=ohke;Password=ohke;Database=test");
        }
    }
}

データの挿入と参照

ここまで準備ができたら、membersとtodosにレコードをinsertして、さらにselectしてみます。

using System;
using System.Collections.Generic;
using System.Linq;
using ConsoleApplication.Database;
using ConsoleApplication.Models;

namespace ConsoleApplication
{
    public class Program
    {
        public static void Main(string[] args)
        {
            using (var db = new TestDbContext())
            {
                var members = new List<Member>
                {
                    new Member { Id = "ohke1", Name = "ohke1" },
                    new Member { Id = "ohke2", Name = "ohke2" },
                };

                var todos = new List<Todo>
                {
                    new Todo { Member = members[0], Content = "content1", Due = DateTime.Now, Done = false },
                    new Todo { Member = members[0], Content = "content2", Due = DateTime.Now, Done = false },
                    new Todo { Member = members[1], Content = "content3", Due = DateTime.Now, Done = false },
                };
                
                db.Members.AddRange(members);
                db.Todos.AddRange(todos);

                db.SaveChanges();
            }

            using (var db = new TestDbContext())
            {
                // [出力]
                //   ohke1, ohke1
                //   ohke2, ohke2
                foreach (var member in db.Members)
                {
                    Console.WriteLine($"{member.Id}, {member.Name}");
                }

                // [出力]
                //   ohke2, 3, content3
                foreach (var todo in db.Members.Where(m => m.Id == "ohke2").SelectMany(m => m.Todos))
                {
                    Console.WriteLine($"{todo.MemberId}, {todo.Id}, {todo.Content}");
                }
            }
        }
    }
}

実行しているSQLクエリをコンソールに出力する

DBへ発行されているSQLを見たい場合は、DbContextにLoggerFactoryを設定します。
ここではMicrosoft.Extensions.Loggingを使ってConsoleにDebug出力するLoggerFactoryを作成・設定しています。

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
    optionsBuilder.UseNpgsql("Host=localhost;Username=ohke;Password=ohke;Database=test");

    var loggerFactory = new LoggerFactory().AddConsole().AddDebug();
    optionsBuilder.UseLoggerFactory(loggerFactory);
}
"dependencies": {
    "Npgsql.EntityFrameworkCore.PostgreSQL": "1.1.0",
    "Microsoft.Extensions.Logging": "1.1.0",
    "Microsoft.Extensions.Logging.Console": "1.1.0",
    "Microsoft.Extensions.Logging.Debug": "1.1.0"
},

先程のプログラムで実行されるSQLを見てみますと、membersへの挿入では2つのレコードがinsertされていますが、DBへは1回のリクエストにまとめられています。

info: Microsoft.EntityFrameworkCore.Storage.IRelationalCommandBuilderFactory[1]
      Executed DbCommand (59ms) [Parameters=[@p0='?', @p1='?', @p2='?', @p3='?'], CommandType='Text', CommandTimeout='30']
      INSERT INTO "members" ("id", "name")
      VALUES (@p0, @p1);
      INSERT INTO "members" ("id", "name")
      VALUES (@p2, @p3);
Microsoft.EntityFrameworkCore.Storage.IRelationalCommandBuilderFactory:Information: Executed DbCommand (59ms) [Parameters=[@p0='?', @p1='?', @p2='?', @p3='?'], CommandType='Text', CommandTimeout='30']
INSERT INTO "members" ("id", "name")
VALUES (@p0, @p1);
INSERT INTO "members" ("id", "name")
VALUES (@p2, @p3);

また、ohke2に紐づくtodosレコードの検索ではmember_idをキーとしてinnner joinされており、memberの検索とtodoの検索が別々のSQLクエリで行われるいわゆるN+1問題の発生をEntity Framework側で防いでいることがわかります。

info: Microsoft.EntityFrameworkCore.Storage.IRelationalCommandBuilderFactory[1]
      Executed DbCommand (2ms) [Parameters=[], CommandType='Text', CommandTimeout='30']
      SELECT "m.Todos"."id", "m.Todos"."content", "m.Todos"."done", "m.Todos"."due", "m.Todos"."member_id"
      FROM "members" AS "m"
      INNER JOIN "todos" AS "m.Todos" ON "m"."id" = "m.Todos"."member_id"
      WHERE "m"."id" = 'ohke2'
Microsoft.EntityFrameworkCore.Storage.IRelationalCommandBuilderFactory:Information: Executed DbCommand (2ms) [Parameters=[], CommandType='Text', CommandTimeout='30']
SELECT "m.Todos"."id", "m.Todos"."content", "m.Todos"."done", "m.Todos"."due", "m.Todos"."member_id"
FROM "members" AS "m"
INNER JOIN "todos" AS "m.Todos" ON "m"."id" = "m.Todos"."member_id"
WHERE "m"."id" = 'ohke2'

.NET CoreアプリケーションでNLogを使う

.NET Coreアプリケーションでログ出力にはNLogが良いみたいですね。

NLog

NLogは導入が容易で拡張性が高いログ出力ライブラリで、最近ではlog4netよりも人気があるようです。

github.com

.NET Coreの場合は、NLog.Extensions.Loggingを使います。

github.com

NLogのインストール

project.jsonNLog.Extensions.Loggingを記載してインストールします。

{
  "version": "1.0.0-*",
  "dependencies": {
    "Microsoft.Extensions.Configuration": "1.1.0",
    "NLog.Extensions.Logging": "1.0.0-rtm-beta2"
  },
  "frameworks": {
    "netcoreapp1.1": {
      "dependencies": {
        "Microsoft.NETCore.App": {
          "type": "platform",
          "version": "1.1.0"
        }
      },
      "imports": "dnxcore50"
    }
  }
}

NLog.config

プロジェクトフォルダの直下にNLog.configファイルを作成します。

  • targetsタグ内でログの出力先(ここではfile.txt)を指定します。
  • rulesタグ内で指定のtargetへの出力条件(ここではDebugレベル以上)を記載します。
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

    <targets>
        <target name="logfile" xsi:type="File" fileName="file.txt" />
    </targets>

    <rules>
        <logger name="*" minlevel="Debug" writeTo="logfile" />
    </rules>
</nlog>

アプリケーション実装

今回はコンソールアプリケーションでログ出力させてみます。
NLog.LogManager#GetCurrentClassLoggerLoggerオブジェクトを取得しています。 引数なしの場合、生成したクラス(ここではProgram)が呼び出し元(caller)のクラスとしてログに出力されます。

using System;
using NLog;

namespace ConsoleApplication
{
    public class Program
    {
        private static Logger _logger = LogManager.GetCurrentClassLogger();

        public static void Main(string[] args)
        {
            _logger.Trace("trace message");
            _logger.Debug("debug message");
            _logger.Info("info message");
            _logger.Warn("warn message");
            _logger.Error("error message");
            _logger.Fatal("fatal message");
        }
    }
}

実行すると、file.txtに以下のように出力されます。
Traceレベルが出力されていないことと、Programが呼び出し元として出力されていることに注意してください。

2017-02-24 21:36:40.1753|DEBUG|Program|debug message
2017-02-24 21:36:40.2848|INFO|Program|info message
2017-02-24 21:36:40.2848|WARN|Program|warn message
2017-02-24 21:36:40.2848|ERROR|Program|error message
2017-02-24 21:36:40.2857|FATAL|Program|fatal message

ログレイアウトをカスタマイズする

NLog.configのtargetsタグではログのレイアウトを細かく設定できます。
例えば↓の感じで設定すると、実行中のクラスとメソッドと行番号まで出力されます。

<targets>
    <target name="logfile" xsi:type="File" fileName="file.txt" 
            layout="${level:uppercase=true:padding=-5} ${longdate} &quot;${message}&quot; ${callsite}#${callsite-linenumber}" />
</targets>
DEBUG 2017-02-24 21:35:09.8425 "debug message" ConsoleApplication.Program.Main#13
INFO  2017-02-24 21:35:10.0356 "info message" ConsoleApplication.Program.Main#14
WARN  2017-02-24 21:35:10.0377 "warn message" ConsoleApplication.Program.Main#15
ERROR 2017-02-24 21:35:10.0391 "error message" ConsoleApplication.Program.Main#16
FATAL 2017-02-24 21:35:10.0421 "fatal message" ConsoleApplication.Program.Main#17

JSONCSVへの出力も全く難しくなく、例えばJSONの場合はlayoutタグでJsonLayoutを指定するだけでOKです。

<targets>
    <target name="logfile" xsi:type="File" fileName="file.txt">
        <layout xsi:type="JsonLayout">
            <attribute name="level" layout="${level}" />
            <attribute name="timestamp" layout="${longdate}" />
            <attribute name="message" layout="${message}" />
            <attribute name="callsite" layout="${callsite}#${callsite-linenumber}" />
        </layout>
    </target>
</targets>
{ "level": "Debug", "timestamp": "2017-02-24 21:27:15.0592", "message": "debug message", "callsite": "ConsoleApplication.Program.Main#13" }

他にもいろいろなレイアウトがあります。
ログの出力クラス(Layout render)は自前で実装することもできます。

ログ出力先を切り替える

例えば、ログレベルがError以上の場合は、通常のログファイルに加えて別のファイルに出力したいといったケースもあるかと思います。
その場合、targetsに通常のログ出力用とErrorログ出力用の2つのtargetを定義して、rulesでログレベルに応じてtargetを切り替えることになります。 下のように設定すると、errorlog.txtにはログレベルがErrorまたはFatalのみが出力されるようになります。

<targets>
    <target name="logfile" xsi:type="File" fileName="log.txt" />
    <target name="errorlogfile" xsi:type="File" fileName="errorlog.txt" />
</targets>
<rules>
    <logger name="*" minlevel="Debug" writeTo="logfile" />
    <logger name="*" minlevel="Error" writeTo="errorlogfile" />
</rules>

出力先にはファイル以外にもメールやDBやコンソールに出力できます。

まとめ

Layout renderやtargetの自前実装について触れませんでしたが、そういった拡張をしなくてもちょっとしたアプリケーションなら十分なログが得られることがわかりました。