Lambda(C#)でDynamoDB Streamsを使う

DynamoDB StreamsをLambda(C#)で使ってみました。

DynamoDB Streams

DynamoDBにはテーブルへの更新(作成、変更、削除)をキャプチャし、その情報(レコード)をバッファリングするストリームを作成することができる「DynamoDB Streams」という機能が提供されています。
レコードは1回のみ現れること・項目の更新順にレコードが取得されること・レコードは24時間保持されることが保証されており、Kinesis Streamとほぼ同じように扱うことができます。

docs.aws.amazon.com

今回はこのDynamoDB Streamsを使って、DynamoDBテーブルの更新を別のDynamoDBテーブルへ同期させるLambdaアプリケーションを作ります。

f:id:ohke:20170217221624p:plain

DynamoDBの作成

同期元となるmaster-tableと同期先となるreplica-tableを作成します。 いずれもパーティションキーKeyが設定されているだけです。

f:id:ohke:20170217211837j:plain:w400

Lambdaの実装

これまで同様、AWS ToolsをインストールしたVisual StudioからLambdaプロジェクトを作成します。

まずは、project.jsonAmazon.Lambda.DynamoDBEventsAWSSDK.DynamoDBv2を追記して、パッケージをインストールします。
DynamoDBEventsは後述するDynamoDBEventクラスで、DynamoDBv2はreplica-tableの更新で、それぞれ必要となります

{
  "version": "1.0.0-*",
  "buildOptions": {
  },

  "dependencies": {
    "Microsoft.NETCore.App": {
      "type": "platform",
      "version": "1.0.0"
    },

    "Amazon.Lambda.Core": "1.0.0*",
    "Amazon.Lambda.Serialization.Json": "1.0.1",
    "Amazon.Lambda.Tools": {
      "type": "build",
      "version": "1.2.1-preview1"
    },
    "Amazon.Lambda.DynamoDBEvents": "1.0.0",
    "AWSSDK.DynamoDBv2": "3.3.1.5"
  },

  "tools": {
    "Amazon.Lambda.Tools" : "1.2.1-preview1"
  },

  "frameworks": {
    "netcoreapp1.0": {
      "imports": "dnxcore50"
    }
  }
}

Lambdaのソースコード(デフォルトであればFunction.cs)を記述します。
ポイントはハンドラの引数として渡されるAmazon.Lambda.DynamoDbEvents.DynamoDBEventオブジェクトで、このオブジェクトの中にどのテーブルのどの項目に対してどんな更新(作成 or 変更 or 削除)が行われたのか、といった情報が詰められています。 今回はこのDynamoDBEventオブジェクトを解析して、同じ変更をreplica-tableに対しても行います。
例えば、EventNameが"INSERT"の場合は、DynamoDB.Keysからkey、DynamoDB.NewImageからvalueの値をそれぞれ取得して同じ値を設定し、現在時刻をtimestampに付加して、replica-tableにデータを作成しています。

using System;
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Newtonsoft.Json;

[assembly: LambdaSerializerAttribute(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace DynamoReplicationLambda
{
    public class Function
    {
        private static readonly AmazonDynamoDBClient DbClient = new AmazonDynamoDBClient(RegionEndpoint.USWest2);

        // DynamoDBの更新情報がDynamoDBEventオブジェクトとして渡される
        public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
        {
            // DynamoDBEventオブジェクトをログに出力
            context.Logger.LogLine(JsonConvert.SerializeObject(dynamoEvent));

            using (var dbContext = new DynamoDBContext(DbClient))
            {
                foreach (var r in dynamoEvent.Records)
                {
                    if (r.EventName == OperationType.INSERT || r.EventName == OperationType.MODIFY)
                    {
                        var item = new ReplicaTableItem
                        {
                            // キーはKeys(パーティションキーのみの場合は1つ、ソートキーを含む場合は2つ持つ)、
                            Key = r.Dynamodb.Keys["key"].S,
                            // 変更後の値はNewImageから取得する
                            Value = r.Dynamodb.NewImage["value"].S,
                            Timestamp = DateTime.Now.ToString(),
                        };

                        dbContext.SaveAsync(item).Wait();
                    }
                    else if (r.EventName == OperationType.REMOVE)
                    {
                        // REMOVEの場合は、キーのみが渡される
                        var item = new ReplicaTableItem
                        {
                            Key = r.Dynamodb.Keys["key"].S,
                        };

                        dbContext.DeleteAsync(item).Wait();
                    }
                }
            }
        }
    }

    // replica-tableのエンティティ
    [DynamoDBTable("replica-table")]
    internal class ReplicaTableItem
    {
        [DynamoDBHashKey]
        [DynamoDBProperty(AttributeName = "key")]
        public string Key { get; set; }
        
        [DynamoDBProperty(AttributeName = "value")]
        public string Value { get; set; }

        [DynamoDBProperty(AttributeName = "timestamp")]
        public string Timestamp { get; set; }
    }
}

Lambdaのデプロイ

AWSのコンソール画面からLambdaを作成してデプロイします。

キーとなるのはトリガの設定で、先ほど作成したmaster-tableを指定することで、master-tableの更新イベントがDynamoDB Streamsに流れてLambdaが起動・実行されます。

f:id:ohke:20170217212335j:plain:w450

したがってLambdaに割り当てるロールには対象のDynamoDBテーブルへdynamodb:GetRecordsdynamodb:GetShardIteratordynamodb:DescribeStreamdynamodb:DescribeTableの4つの操作が許可されている必要があります。
それ以外のdynamodb:DescribeTabledynamodb:UpdateItemdynamodb:DeleteItemは、replica-table更新のために許可しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

動作確認

master-tableに項目を追加してみます。

f:id:ohke:20170217213937j:plain:w300

すると、Lambdaでは設定したトリガからLambdaが起動されます。

f:id:ohke:20170217214235j:plain:w450

DynamoDBEventオブジェクトをトレースしたログを見ると、以下のイベントが渡されます。
EventName.Valueで新規作成されたこと、EventSourceArnでmaster-tableのDynamoDB StreamがLambdaを起動していることがわかります。 また、Dynamodb.Keysパーティションキー、DynamoDB.NewImageに追加された値がそれぞれ入っていることも確認できます。

  • 変更では、EventName.Valueが"MODIFY"となり、DynamoDB.OldImageに変更前の値も入ります。
  • 削除では、EventName.Valueが"REMOVE"となり、DynamoDB.OldImageDynamoDB.NewImageも空でDynamoDB.Keysのみに値が詰められます。
{
    "Records": [
        {
            "EventSourceArn": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/master-table/stream/2017-02-17T04:50:43.602",
            "AwsRegion": "us-west-2",
            "Dynamodb": {
                "ApproximateCreationDateTime": "2017-02-17T04:59:00Z",
                "Keys": {
                    "key": {
                        "B": null,
                        "BOOL": false,
                        "IsBOOLSet": false,
                        "BS": [],
                        "L": [],
                        "IsLSet": false,
                        "M": {},
                        "IsMSet": false,
                        "N": null,
                        "NS": [],
                        "NULL": false,
                        "S": "key1",
                        "SS": []
                    }
                },
                "NewImage": {
                    "value": {
                        "B": null,
                        "BOOL": false,
                        "IsBOOLSet": false,
                        "BS": [],
                        "L": [],
                        "IsLSet": false,
                        "M": {},
                        "IsMSet": false,
                        "N": null,
                        "NS": [],
                        "NULL": false,
                        "S": "value1",
                        "SS": []
                    },
                    "key": {
                        "B": null,
                        "BOOL": false,
                        "IsBOOLSet": false,
                        "BS": [],
                        "L": [],
                        "IsLSet": false,
                        "M": {},
                        "IsMSet": false,
                        "N": null,
                        "NS": [],
                        "NULL": false,
                        "S": "key1",
                        "SS": []
                    }
                },
                "OldImage": {},
                "SequenceNumber": "2656400000000004815564263",
                "SizeBytes": 25,
                "StreamViewType": {
                    "Value": "NEW_AND_OLD_IMAGES"
                }
            },
            "EventID": "6208a61d1fae612ca60e9857acbb9cda",
            "EventName": {
                "Value": "INSERT"
            },
            "EventSource": "aws:dynamodb",
            "EventVersion": "1.1"
        }
    ]
}

同じデータがタイムスタンプ付きでreplica-tableにも作成されていることが確認できました。

f:id:ohke:20170217215711j:plain:w350