DynamoDBをバッチなどでアクセスすることを想定して、今回はDynamoDBのデータをバルクで取得・更新させてみます。
BatGetとBatchWrite
DynamoDBをバルクで取得・更新する場合、C#の永続性モデルではDynamoDBContextクラスで提供されるBatchGetおよびBatchWriteを使います。
// バルクで取得する場合 using (var dbContext = new DynamoDBContext(DbClient)) { // DynamoDBContextでBatchGetを作成して、 var batchGet = dbContext.CreateBatchGet<BulkTestTableItem>(); for (var j = 0; j < recordCount; j++) { // 取得するアイテムのキーをセットして、 batchGet.AddKey(j.ToString()); } // ExecuteAsyncで非同期に取得する batchGet.ExecuteAsync().Wait(); var items = batchGet.Results; }
// バルクで更新する場合 using (var dbContext = new DynamoDBContext(DbClient)) { // DynamoDBContextでBatchWriteを作成して、 var batchWrite = dbContext.CreateBatchWrite<BulkTestTableItem>(); // アイテムを追加して、 batchWrite.AddPutItems(items); // ExecuteAsyncで非同期に更新する batchWrite.ExecuteAsync().Wait(); }
実例と性能測定
実例としてDynamoDBをバルクで取得して、カウンタをインクリメントして、バルクで更新するLamdaを作成します。
また、1件ずつ取得・更新する場合と比較して、どの程度速くなるのかを計測してみます。
DynamoDBテーブルの作成
テスト用に、文字列型のパーティションキーIdのみが設定されたシンプルなDynamoDBテーブル(bulk-test-table)を作成しました。
DynamoDBの読み書きがボトルネックとなることを防ぐため、今回は読み込み・書き込みのキャパシティを100に設定しています。
Lambdaの実装
2パターンで取得・更新する、性能測定用のLambdaを作成します。
- 1アイテムずつ取得・更新
- 全アイテムをバルクで取得・更新
引数(input)には1回で取得・更新するアイテム数と、試行回数を指定します。
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Amazon; using Amazon.DynamoDBv2; using Amazon.DynamoDBv2.DataModel; using Amazon.Lambda.Core; [assembly: LambdaSerializerAttribute(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))] namespace DynamoReplicationLambda { public class Function { private static readonly AmazonDynamoDBClient DbClient = new AmazonDynamoDBClient(RegionEndpoint.USWest2); // inputは1試行で取得・更新するアイテム数と試行数を指定します("100 10"の場合は、100アイテムの取得・更新を10回試行する) public void FunctionHandler(string input, ILambdaContext context) { var sw1 = new Stopwatch(); var sw2 = new Stopwatch(); var recordCount = int.Parse(input.Split(' ')[0]); Console.WriteLine("Record count: " + recordCount); var tryCount = int.Parse(input.Split(' ')[1]); Console.WriteLine("Try count: " + tryCount); // 初期化用のデータの投入 using (var dbContext = new DynamoDBContext(DbClient)) { for (int i = 0; i < recordCount; i++) { dbContext.SaveAsync(new BulkTestTableItem { Id = i.ToString(), Count = 0 }).Wait(); } } // ①1アイテムずつ取得・更新(同期版) using (var dbContext = new DynamoDBContext(DbClient)) { sw1.Start(); for (var i = 0; i < tryCount; i++) { for (var j = 0; j < recordCount; j++) { var task = dbContext.LoadAsync<BulkTestTableItem>(hashKey: j.ToString()); task.Wait(); var item = task.Result; item.Count++; dbContext.SaveAsync(item).Wait(); } } sw1.Stop(); } // ②全アイテムをバルクで取得・更新 using (var dbContext = new DynamoDBContext(DbClient)) { sw2.Start(); for (var i = 0; i < tryCount; i++) { var batchGet = dbContext.CreateBatchGet<BulkTestTableItem>(); for (var j = 0; j < recordCount; j++) { batchGet.AddKey(j.ToString()); } batchGet.ExecuteAsync().Wait(); var items = batchGet.Results; items = items.Select(item => new BulkTestTableItem {Id = item.Id, Count = item.Count + 1}).ToList(); var batchWrite = dbContext.CreateBatchWrite<BulkTestTableItem>(); batchWrite.AddPutItems(items); batchWrite.ExecuteAsync().Wait(); } sw2.Stop(); } Console.WriteLine($"①実行時間[ms]: {sw1.ElapsedMilliseconds}"); Console.WriteLine($"②実行時間[ms]: {sw2.ElapsedMilliseconds}"); } } [DynamoDBTable("bulk-test-table")] internal class BulkTestTableItem { [DynamoDBHashKey] [DynamoDBProperty(AttributeName = "Id")] public string Id { get; set; } [DynamoDBProperty(AttributeName = "Count")] public int Count { get; set; } } }
測定結果
1アイテムずつ1000回取得・更新した場合と、100アイテムずつ10回バルク取得・更新するのにかかった処理時間をそれぞれ示します。
見ての通り、バルクで取得・更新するほうが約23倍高速になっており、効果は大きいことがわかるかと思います。
処理時間[ms] | |
---|---|
1アイテムずつ1000アイテムを取得・更新 | 20678 |
100アイテムを10回バルクで取得・更新 | 886 |