C#でDynamoDBをバルクで取得・更新する

DynamoDBをバッチなどでアクセスすることを想定して、今回はDynamoDBのデータをバルクで取得・更新させてみます。

BatGetとBatchWrite

DynamoDBをバルクで取得・更新する場合、C#の永続性モデルではDynamoDBContextクラスで提供されるBatchGetおよびBatchWriteを使います。

docs.aws.amazon.com

// バルクで取得する場合
using (var dbContext = new DynamoDBContext(DbClient))
{
    // DynamoDBContextでBatchGetを作成して、
    var batchGet = dbContext.CreateBatchGet<BulkTestTableItem>();
    for (var j = 0; j < recordCount; j++)
    {
        // 取得するアイテムのキーをセットして、
        batchGet.AddKey(j.ToString());
    }
    // ExecuteAsyncで非同期に取得する
    batchGet.ExecuteAsync().Wait();
    var items = batchGet.Results;
}
// バルクで更新する場合
using (var dbContext = new DynamoDBContext(DbClient))
{
    // DynamoDBContextでBatchWriteを作成して、
    var batchWrite = dbContext.CreateBatchWrite<BulkTestTableItem>();
    // アイテムを追加して、
    batchWrite.AddPutItems(items);
    // ExecuteAsyncで非同期に更新する
    batchWrite.ExecuteAsync().Wait();
}

実例と性能測定

実例としてDynamoDBをバルクで取得して、カウンタをインクリメントして、バルクで更新するLamdaを作成します。
また、1件ずつ取得・更新する場合と比較して、どの程度速くなるのかを計測してみます。

DynamoDBテーブルの作成

テスト用に、文字列型のパーティションキーIdのみが設定されたシンプルなDynamoDBテーブル(bulk-test-table)を作成しました。
DynamoDBの読み書きがボトルネックとなることを防ぐため、今回は読み込み・書き込みのキャパシティを100に設定しています。

f:id:ohke:20170309225756j:plain

Lambdaの実装

2パターンで取得・更新する、性能測定用のLambdaを作成します。

  1. 1アイテムずつ取得・更新
  2. 全アイテムをバルクで取得・更新

引数(input)には1回で取得・更新するアイテム数と、試行回数を指定します。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.Lambda.Core;

[assembly: LambdaSerializerAttribute(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace DynamoReplicationLambda
{
    public class Function
    {
        private static readonly AmazonDynamoDBClient DbClient = new AmazonDynamoDBClient(RegionEndpoint.USWest2);

        // inputは1試行で取得・更新するアイテム数と試行数を指定します("100 10"の場合は、100アイテムの取得・更新を10回試行する)
        public void FunctionHandler(string input, ILambdaContext context)
        {
            var sw1 = new Stopwatch();
            var sw2 = new Stopwatch();

            var recordCount = int.Parse(input.Split(' ')[0]);
            Console.WriteLine("Record count: " + recordCount);

            var tryCount = int.Parse(input.Split(' ')[1]);
            Console.WriteLine("Try count: " + tryCount);

            // 初期化用のデータの投入
            using (var dbContext = new DynamoDBContext(DbClient))
            {
                for (int i = 0; i < recordCount; i++)
                {
                    dbContext.SaveAsync(new BulkTestTableItem
                    {
                        Id = i.ToString(),
                        Count = 0
                    }).Wait();
                }
            }

            // ①1アイテムずつ取得・更新(同期版)
            using (var dbContext = new DynamoDBContext(DbClient))
            {
                sw1.Start();
                for (var i = 0; i < tryCount; i++)
                {
                    for (var j = 0; j < recordCount; j++)
                    {
                        var task = dbContext.LoadAsync<BulkTestTableItem>(hashKey: j.ToString());
                        task.Wait();
                        var item = task.Result;

                        item.Count++;

                        dbContext.SaveAsync(item).Wait();
                    }
                }
                sw1.Stop();
            }

            // ②全アイテムをバルクで取得・更新
            using (var dbContext = new DynamoDBContext(DbClient))
            {
                sw2.Start();
                for (var i = 0; i < tryCount; i++)
                {
                    var batchGet = dbContext.CreateBatchGet<BulkTestTableItem>();
                    for (var j = 0; j < recordCount; j++)
                    {
                        batchGet.AddKey(j.ToString());
                    }
                    batchGet.ExecuteAsync().Wait();
                    var items = batchGet.Results;

                    items = items.Select(item => new BulkTestTableItem {Id = item.Id, Count = item.Count + 1}).ToList();

                    var batchWrite = dbContext.CreateBatchWrite<BulkTestTableItem>();
                    batchWrite.AddPutItems(items);
                    batchWrite.ExecuteAsync().Wait();
                }
                sw2.Stop();
            }

            Console.WriteLine($"①実行時間[ms]: {sw1.ElapsedMilliseconds}");
            Console.WriteLine($"②実行時間[ms]: {sw2.ElapsedMilliseconds}");
        }
    }

    [DynamoDBTable("bulk-test-table")]
    internal class BulkTestTableItem
    {
        [DynamoDBHashKey]
        [DynamoDBProperty(AttributeName = "Id")]
        public string Id { get; set; }
        
        [DynamoDBProperty(AttributeName = "Count")]
        public int Count { get; set; }
    }
}

測定結果

1アイテムずつ1000回取得・更新した場合と、100アイテムずつ10回バルク取得・更新するのにかかった処理時間をそれぞれ示します。
見ての通り、バルクで取得・更新するほうが約23倍高速になっており、効果は大きいことがわかるかと思います。

処理時間[ms]
1アイテムずつ1000アイテムを取得・更新 20678
100アイテムを10回バルクで取得・更新 886