Lambda(C#)でDynamoDB Streamsを使う

DynamoDB StreamsをLambda(C#)で使ってみました。

DynamoDB Streams

DynamoDBにはテーブルへの更新(作成、変更、削除)をキャプチャし、その情報(レコード)をバッファリングするストリームを作成することができる「DynamoDB Streams」という機能が提供されています。
レコードは1回のみ現れること・項目の更新順にレコードが取得されること・レコードは24時間保持されることが保証されており、Kinesis Streamとほぼ同じように扱うことができます。

docs.aws.amazon.com

今回はこのDynamoDB Streamsを使って、DynamoDBテーブルの更新を別のDynamoDBテーブルへ同期させるLambdaアプリケーションを作ります。

f:id:ohke:20170217221624p:plain

DynamoDBの作成

同期元となるmaster-tableと同期先となるreplica-tableを作成します。 いずれもパーティションキーKeyが設定されているだけです。

f:id:ohke:20170217211837j:plain:w400

Lambdaの実装

これまで同様、AWS ToolsをインストールしたVisual StudioからLambdaプロジェクトを作成します。

まずは、project.jsonAmazon.Lambda.DynamoDBEventsAWSSDK.DynamoDBv2を追記して、パッケージをインストールします。
DynamoDBEventsは後述するDynamoDBEventクラスで、DynamoDBv2はreplica-tableの更新で、それぞれ必要となります

{
  "version": "1.0.0-*",
  "buildOptions": {
  },

  "dependencies": {
    "Microsoft.NETCore.App": {
      "type": "platform",
      "version": "1.0.0"
    },

    "Amazon.Lambda.Core": "1.0.0*",
    "Amazon.Lambda.Serialization.Json": "1.0.1",
    "Amazon.Lambda.Tools": {
      "type": "build",
      "version": "1.2.1-preview1"
    },
    "Amazon.Lambda.DynamoDBEvents": "1.0.0",
    "AWSSDK.DynamoDBv2": "3.3.1.5"
  },

  "tools": {
    "Amazon.Lambda.Tools" : "1.2.1-preview1"
  },

  "frameworks": {
    "netcoreapp1.0": {
      "imports": "dnxcore50"
    }
  }
}

Lambdaのソースコード(デフォルトであればFunction.cs)を記述します。
ポイントはハンドラの引数として渡されるAmazon.Lambda.DynamoDbEvents.DynamoDBEventオブジェクトで、このオブジェクトの中にどのテーブルのどの項目に対してどんな更新(作成 or 変更 or 削除)が行われたのか、といった情報が詰められています。 今回はこのDynamoDBEventオブジェクトを解析して、同じ変更をreplica-tableに対しても行います。
例えば、EventNameが"INSERT"の場合は、DynamoDB.Keysからkey、DynamoDB.NewImageからvalueの値をそれぞれ取得して同じ値を設定し、現在時刻をtimestampに付加して、replica-tableにデータを作成しています。

using System;
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Newtonsoft.Json;

[assembly: LambdaSerializerAttribute(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace DynamoReplicationLambda
{
    public class Function
    {
        private static readonly AmazonDynamoDBClient DbClient = new AmazonDynamoDBClient(RegionEndpoint.USWest2);

        // DynamoDBの更新情報がDynamoDBEventオブジェクトとして渡される
        public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
        {
            // DynamoDBEventオブジェクトをログに出力
            context.Logger.LogLine(JsonConvert.SerializeObject(dynamoEvent));

            using (var dbContext = new DynamoDBContext(DbClient))
            {
                foreach (var r in dynamoEvent.Records)
                {
                    if (r.EventName == OperationType.INSERT || r.EventName == OperationType.MODIFY)
                    {
                        var item = new ReplicaTableItem
                        {
                            // キーはKeys(パーティションキーのみの場合は1つ、ソートキーを含む場合は2つ持つ)、
                            Key = r.Dynamodb.Keys["key"].S,
                            // 変更後の値はNewImageから取得する
                            Value = r.Dynamodb.NewImage["value"].S,
                            Timestamp = DateTime.Now.ToString(),
                        };

                        dbContext.SaveAsync(item).Wait();
                    }
                    else if (r.EventName == OperationType.REMOVE)
                    {
                        // REMOVEの場合は、キーのみが渡される
                        var item = new ReplicaTableItem
                        {
                            Key = r.Dynamodb.Keys["key"].S,
                        };

                        dbContext.DeleteAsync(item).Wait();
                    }
                }
            }
        }
    }

    // replica-tableのエンティティ
    [DynamoDBTable("replica-table")]
    internal class ReplicaTableItem
    {
        [DynamoDBHashKey]
        [DynamoDBProperty(AttributeName = "key")]
        public string Key { get; set; }
        
        [DynamoDBProperty(AttributeName = "value")]
        public string Value { get; set; }

        [DynamoDBProperty(AttributeName = "timestamp")]
        public string Timestamp { get; set; }
    }
}

Lambdaのデプロイ

AWSのコンソール画面からLambdaを作成してデプロイします。

キーとなるのはトリガの設定で、先ほど作成したmaster-tableを指定することで、master-tableの更新イベントがDynamoDB Streamsに流れてLambdaが起動・実行されます。

f:id:ohke:20170217212335j:plain:w450

したがってLambdaに割り当てるロールには対象のDynamoDBテーブルへdynamodb:GetRecordsdynamodb:GetShardIteratordynamodb:DescribeStreamdynamodb:DescribeTableの4つの操作が許可されている必要があります。
それ以外のdynamodb:DescribeTabledynamodb:UpdateItemdynamodb:DeleteItemは、replica-table更新のために許可しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

動作確認

master-tableに項目を追加してみます。

f:id:ohke:20170217213937j:plain:w300

すると、Lambdaでは設定したトリガからLambdaが起動されます。

f:id:ohke:20170217214235j:plain:w450

DynamoDBEventオブジェクトをトレースしたログを見ると、以下のイベントが渡されます。
EventName.Valueで新規作成されたこと、EventSourceArnでmaster-tableのDynamoDB StreamがLambdaを起動していることがわかります。 また、Dynamodb.Keysパーティションキー、DynamoDB.NewImageに追加された値がそれぞれ入っていることも確認できます。

  • 変更では、EventName.Valueが"MODIFY"となり、DynamoDB.OldImageに変更前の値も入ります。
  • 削除では、EventName.Valueが"REMOVE"となり、DynamoDB.OldImageDynamoDB.NewImageも空でDynamoDB.Keysのみに値が詰められます。
{
    "Records": [
        {
            "EventSourceArn": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/master-table/stream/2017-02-17T04:50:43.602",
            "AwsRegion": "us-west-2",
            "Dynamodb": {
                "ApproximateCreationDateTime": "2017-02-17T04:59:00Z",
                "Keys": {
                    "key": {
                        "B": null,
                        "BOOL": false,
                        "IsBOOLSet": false,
                        "BS": [],
                        "L": [],
                        "IsLSet": false,
                        "M": {},
                        "IsMSet": false,
                        "N": null,
                        "NS": [],
                        "NULL": false,
                        "S": "key1",
                        "SS": []
                    }
                },
                "NewImage": {
                    "value": {
                        "B": null,
                        "BOOL": false,
                        "IsBOOLSet": false,
                        "BS": [],
                        "L": [],
                        "IsLSet": false,
                        "M": {},
                        "IsMSet": false,
                        "N": null,
                        "NS": [],
                        "NULL": false,
                        "S": "value1",
                        "SS": []
                    },
                    "key": {
                        "B": null,
                        "BOOL": false,
                        "IsBOOLSet": false,
                        "BS": [],
                        "L": [],
                        "IsLSet": false,
                        "M": {},
                        "IsMSet": false,
                        "N": null,
                        "NS": [],
                        "NULL": false,
                        "S": "key1",
                        "SS": []
                    }
                },
                "OldImage": {},
                "SequenceNumber": "2656400000000004815564263",
                "SizeBytes": 25,
                "StreamViewType": {
                    "Value": "NEW_AND_OLD_IMAGES"
                }
            },
            "EventID": "6208a61d1fae612ca60e9857acbb9cda",
            "EventName": {
                "Value": "INSERT"
            },
            "EventSource": "aws:dynamodb",
            "EventVersion": "1.1"
        }
    ]
}

同じデータがタイムスタンプ付きでreplica-tableにも作成されていることが確認できました。

f:id:ohke:20170217215711j:plain:w350

Json.NETを使って任意のクラスやDictionary・Listにデシリアライズする

Json.NETを使って、JSONを任意のクラスとDictionary・Listにそれぞれデシリアライズしてみます。

www.nuget.org

任意のクラス(POCO)にデシリアライズする

例えばアプリケーションの設定ファイルのように静的な構造のJSONであれば、専用のPOCOにデシリアライズするほうが扱いやすいです。

{
  "Server": {
    "DomainName": "domain.name",
    "WebServerHostNames": [
      "web1.host.name",
      "web2.host.name"
    ],
    "DbServerHostNames": [
      "db1.host.name",
      "db2.host.name"
    ]
  },
  "Table": {
    "AccountTableName": "account-table" 
  }  
}

JSONのキー名とPOCOのプロパティ名が一致していれば、JsonConvert.DeserializeObject<Config>で上のJSONファイルをConfigクラスにデシリアライズできます。
ConfigとServerConfigのように、クラスは入れ子になっていてもOKです。

using Newtonsoft.Json;

public class Config
{
    public ServerConfig Server { get; set; }
    public TableConfig Table { get; set; }

    public class ServerConfig
    {
        public string DomainName { get; set; }
        public IList<string> WebServerHostNames { get; set; }
        public IList<string> DbServerHostNames { get; set; }
    }

    public class TableConfig
    {
        public string AccountTableName { get; set; }
    }
}

var config = JsonConvert.DeserializeObject<Config>(File.ReadAllText("config.json"));

DictionaryとListへデシリアライズする

例えば以下のマスタデータのようにキー("Kanto"や"Kinki")が動的に増減するJSONの場合は、DictionaryやListとして扱える方がソースコードに手をいれる必要が無いので都合が良いです。

{
  "Area": {
    "Kanto": [
      "Tokyo",
      "Chiba",
      "Kanagawa"
    ],
    "Kinki": [
      "Osaka",
      "Kyoto",
      "Nara"
    ] 
  }
}

POCOと同じく、JsonConvert.DeserializeObject<>でDictionaryやListへデシリアライズできます。
この場合も入れ子になっていても良く、Dictionary<string, Dictionary<string, List<string>>>でもOKです。

using Newtonsoft.Json;

var area =
    JsonConvert.DeserializeObject<Dictionary<string, Dictionary<string, List<string>>>>(
        File.ReadAllText("area.json"))["Area"];

foreach (var key in area.Keys)
{
    Console.WriteLine(key);
    foreach (var value in area[key])
    {
        Console.WriteLine(value);
    }
}

Go+Gin+DynamoDBでWeb APIサーバを作る

仕事でGoを使うこととなりましたので、年明けから勉強しておりました。
一つの区切りとしてナンチャッテToDo Web APIサーバを作りましたので、GitHubで公開しておきます。

github.com

参考文献

Goは↓の本を読んでました。
他の言語と比較しながらGoの考え方やクセを細かく説明してくれるので、CやJavaの経験者であれば効率的に吸収できるかと思います。
Amazon CAPTCHA

GinはREADME、DynamoDBはAWSのドキュメントとQiita投稿で使い方を学びました。

github.com

dynamodb - Amazon Web Services - Go SDK

あえて aws-sdk-go で dynamoDB を使うときの基本操作 - Qiita

開発環境

Visual Studio Code(Windows)で構築しました。
↓の記事でコーディングからデバッグまでの一式できるようになるかと思います。
WindowsのVisual Studio CodeでGo言語の開発環境を作る - 素敵なおひげですね

構成

ユーザ登録、ログイン/ログアウト、ToDoの投稿と取得ができる簡単なWeb APIです。
Web APIフレームワークとしてGin、DBはDynamoDBを使っています。

Gin

Ginは高速さを売りとしているGoのWeb APIフレームワークです。
↓をビルド・実行するだけで、さくっとWeb APIサーバになります。

package main

import "gopkg.in/gin-gonic/gin.v1"

func main() {
    r := gin.Default()
    r.GET("/hello", func(c *gin.Context) {
        c.JSON(200, gin.H{
            "message": "Hello, World!",
        })
    })
    r.Run()
}

Goはnet/httpというHTTPサーバを標準搭載していますが、GinはAPIに特化していますのでそこは作りやすいようになっています。

  • 例えば、net/httpだと、リクエストbodyを文字列として読み込んでからデシリアライズ(アンマーシャル)する必要がありますが、
  • Ginでは、c.BindJSON(&request)だけでrequestへ値が詰められます。

DynamoDB

このAPIでは2つのDynamoDBテーブルにアクセスします。
テーブル名(とリージョン)はconfig.jsonで変更できます。

テーブル名 パーティションキー ソートキー
todo-user-table Id -
todo-todo-table Id UserId

GoからのDynamoDBのアクセスには素のaws-sdk-goを使っています。 なかなかツライです。

  • ソースコードを見ていただければわかるかと思いますが、DynamoDBへのCRUDでかなりの行数となっている上、ポインタとおまじないチックな手順が入り乱れています。
    • ExpressionAttributeNamesで項目名のプレースホルダ、ExpressionAttributeValuesで値のプレースホルダを指定したり、
    • map[string]*dynamodb.AttributeValue{ ":value": { S: aws.String(value),},},とかしんどいです。
  • 何らかのライブラリでラッピングしたくなる。

宿題

  • デプロイ
    • ローカルでサーバを実行し、DynamoDBのみAWSへアクセスしていました。
    • AWS ElasticBeanstalkにコマンドからデプロイできるようしたいところです。
  • テスト
    • 標準で揃っていますが、まだ手を出せてないです。

↓の本を読んで勉強します。
みんなのGo言語【現場で使える実践テクニック】 | 松木雅幸, mattn, 藤原俊一郎, 中島大一, 牧 大輔, 鈴木健太, 稲葉貴洋 |本 | 通販 | Amazon