DynamoDB StreamsをLambda(C#)で使ってみました。
DynamoDB Streams
DynamoDBにはテーブルへの更新(作成、変更、削除)をキャプチャし、その情報(レコード)をバッファリングするストリームを作成することができる「DynamoDB Streams」という機能が提供されています。
レコードは1回のみ現れること・項目の更新順にレコードが取得されること・レコードは24時間保持されることが保証されており、Kinesis Streamとほぼ同じように扱うことができます。
今回はこのDynamoDB Streamsを使って、DynamoDBテーブルの更新を別のDynamoDBテーブルへ同期させるLambdaアプリケーションを作ります。
DynamoDBの作成
同期元となるmaster-table
と同期先となるreplica-table
を作成します。
いずれもパーティションキーKey
が設定されているだけです。
Lambdaの実装
これまで同様、AWS ToolsをインストールしたVisual StudioからLambdaプロジェクトを作成します。
まずは、project.jsonにAmazon.Lambda.DynamoDBEvents
とAWSSDK.DynamoDBv2
を追記して、パッケージをインストールします。
DynamoDBEventsは後述するDynamoDBEventクラスで、DynamoDBv2はreplica-tableの更新で、それぞれ必要となります
{ "version": "1.0.0-*", "buildOptions": { }, "dependencies": { "Microsoft.NETCore.App": { "type": "platform", "version": "1.0.0" }, "Amazon.Lambda.Core": "1.0.0*", "Amazon.Lambda.Serialization.Json": "1.0.1", "Amazon.Lambda.Tools": { "type": "build", "version": "1.2.1-preview1" }, "Amazon.Lambda.DynamoDBEvents": "1.0.0", "AWSSDK.DynamoDBv2": "3.3.1.5" }, "tools": { "Amazon.Lambda.Tools" : "1.2.1-preview1" }, "frameworks": { "netcoreapp1.0": { "imports": "dnxcore50" } } }
Lambdaのソースコード(デフォルトであればFunction.cs)を記述します。
ポイントはハンドラの引数として渡されるAmazon.Lambda.DynamoDbEvents.DynamoDBEvent
オブジェクトで、このオブジェクトの中にどのテーブルのどの項目に対してどんな更新(作成 or 変更 or 削除)が行われたのか、といった情報が詰められています。
今回はこのDynamoDBEventオブジェクトを解析して、同じ変更をreplica-tableに対しても行います。
例えば、EventName
が"INSERT"の場合は、DynamoDB.Keys
からkey、DynamoDB.NewImage
からvalueの値をそれぞれ取得して同じ値を設定し、現在時刻をtimestampに付加して、replica-tableにデータを作成しています。
using System; using Amazon; using Amazon.DynamoDBv2; using Amazon.DynamoDBv2.DataModel; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; using Newtonsoft.Json; [assembly: LambdaSerializerAttribute(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))] namespace DynamoReplicationLambda { public class Function { private static readonly AmazonDynamoDBClient DbClient = new AmazonDynamoDBClient(RegionEndpoint.USWest2); // DynamoDBの更新情報がDynamoDBEventオブジェクトとして渡される public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { // DynamoDBEventオブジェクトをログに出力 context.Logger.LogLine(JsonConvert.SerializeObject(dynamoEvent)); using (var dbContext = new DynamoDBContext(DbClient)) { foreach (var r in dynamoEvent.Records) { if (r.EventName == OperationType.INSERT || r.EventName == OperationType.MODIFY) { var item = new ReplicaTableItem { // キーはKeys(パーティションキーのみの場合は1つ、ソートキーを含む場合は2つ持つ)、 Key = r.Dynamodb.Keys["key"].S, // 変更後の値はNewImageから取得する Value = r.Dynamodb.NewImage["value"].S, Timestamp = DateTime.Now.ToString(), }; dbContext.SaveAsync(item).Wait(); } else if (r.EventName == OperationType.REMOVE) { // REMOVEの場合は、キーのみが渡される var item = new ReplicaTableItem { Key = r.Dynamodb.Keys["key"].S, }; dbContext.DeleteAsync(item).Wait(); } } } } } // replica-tableのエンティティ [DynamoDBTable("replica-table")] internal class ReplicaTableItem { [DynamoDBHashKey] [DynamoDBProperty(AttributeName = "key")] public string Key { get; set; } [DynamoDBProperty(AttributeName = "value")] public string Value { get; set; } [DynamoDBProperty(AttributeName = "timestamp")] public string Timestamp { get; set; } } }
Lambdaのデプロイ
AWSのコンソール画面からLambdaを作成してデプロイします。
キーとなるのはトリガの設定で、先ほど作成したmaster-tableを指定することで、master-tableの更新イベントがDynamoDB Streamsに流れてLambdaが起動・実行されます。
したがってLambdaに割り当てるロールには対象のDynamoDBテーブルへdynamodb:GetRecords
、dynamodb:GetShardIterator
、dynamodb:DescribeStream
、dynamodb:DescribeTable
の4つの操作が許可されている必要があります。
それ以外のdynamodb:DescribeTable
、dynamodb:UpdateItem
、dynamodb:DeleteItem
は、replica-table更新のために許可しています。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:ListStreams", "dynamodb:DescribeTable", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
動作確認
master-tableに項目を追加してみます。
すると、Lambdaでは設定したトリガからLambdaが起動されます。
DynamoDBEventオブジェクトをトレースしたログを見ると、以下のイベントが渡されます。
EventName.Value
で新規作成されたこと、EventSourceArn
でmaster-tableのDynamoDB StreamがLambdaを起動していることがわかります。
また、Dynamodb.Keys
にパーティションキー、DynamoDB.NewImage
に追加された値がそれぞれ入っていることも確認できます。
- 変更では、
EventName.Value
が"MODIFY"となり、DynamoDB.OldImage
に変更前の値も入ります。 - 削除では、
EventName.Value
が"REMOVE"となり、DynamoDB.OldImage
もDynamoDB.NewImage
も空でDynamoDB.Keys
のみに値が詰められます。
{ "Records": [ { "EventSourceArn": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/master-table/stream/2017-02-17T04:50:43.602", "AwsRegion": "us-west-2", "Dynamodb": { "ApproximateCreationDateTime": "2017-02-17T04:59:00Z", "Keys": { "key": { "B": null, "BOOL": false, "IsBOOLSet": false, "BS": [], "L": [], "IsLSet": false, "M": {}, "IsMSet": false, "N": null, "NS": [], "NULL": false, "S": "key1", "SS": [] } }, "NewImage": { "value": { "B": null, "BOOL": false, "IsBOOLSet": false, "BS": [], "L": [], "IsLSet": false, "M": {}, "IsMSet": false, "N": null, "NS": [], "NULL": false, "S": "value1", "SS": [] }, "key": { "B": null, "BOOL": false, "IsBOOLSet": false, "BS": [], "L": [], "IsLSet": false, "M": {}, "IsMSet": false, "N": null, "NS": [], "NULL": false, "S": "key1", "SS": [] } }, "OldImage": {}, "SequenceNumber": "2656400000000004815564263", "SizeBytes": 25, "StreamViewType": { "Value": "NEW_AND_OLD_IMAGES" } }, "EventID": "6208a61d1fae612ca60e9857acbb9cda", "EventName": { "Value": "INSERT" }, "EventSource": "aws:dynamodb", "EventVersion": "1.1" } ] }
同じデータがタイムスタンプ付きでreplica-tableにも作成されていることが確認できました。