AWS DynamoDBをC#でCRUDする

今回はDynamoDBへのCRUD操作を、C#で書いて、Lambdaで動かしてみたいと思います。

テーブルの作成

まずはAWS ConsoleからDynamoDBへテーブルを予め作成します。

  • テーブル名を"Todos"とします。
  • パーティションキーを"Id"、ソートキーを"Due"とします。 f:id:ohke:20170106235004j:plain:w450

エンティティクラスの作成

AWS Lambdaプロジェクトを作成し、今回はDynamoDBTable属性を付与したエンティティクラスを使います。

  • AWS Lambdaプロジェクトの作成とデプロイは下記の投稿に詳しく記載されています。

qiita.com

  • DynamoDBTable属性が付与することでTodoEntityクラスがテーブル("Todos")へマッピングされます。
using System.Collections.Generic;
using Amazon.DynamoDBv2.DataModel;

namespace AWSLambda
{
    [DynamoDBTable("Todos")]
    public class TodoEntity
    {
        // パーティションキー(ハッシュキー)
        [DynamoDBHashKey]
        public string Id { get; set; }

        // ソートキー(レンジキー)
        [DynamoDBRangeKey]
        public string Due { get; set; }

        // テーブルでは項目名"Done"にマッピングされる
        [DynamoDBProperty("Done")]
        public bool? DoneFlag { get; set; }

        // List<T>とすることで複数項目にマッピングされる
        public List<string> Contents { get; set; }

        // テーブルへマッピングされない
        [DynamoDBIgnore]
        public string Operation { get; set; }
    }
}

Lambdaの作成

それではLambdaを作成します。

  • TodosテーブルへマッピングされたTodoEntityオブジェクトを介すことで、DynamoDBへ接続されたDynamoDBContextを使って読み書きします。
  • TodoEntityクラスを引数として、Operationの値によってInsert/Select/Update/Deleteの処理を分岐しています。
  • いずれの処理もパーティションキーとソートキーの指定が必要です。
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class DynamoDbOperationFunction
    {
        private static readonly AmazonDynamoDBClient Client = new AmazonDynamoDBClient(RegionEndpoint.USWest2);

        // JSONをデシリアライズしてTodoEntityオブジェクトを得る
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public void DynamoDbOperationHandler(TodoEntity entity, ILambdaContext context)
        {
            context.Logger.LogLine(JsonConvert.SerializeObject(entity));

            // 最初にDynamoDBContextを生成する
            using (var dbContext = new DynamoDBContext(Client))
            {
                switch (entity.Operation)
                {
                    case "Insert":
                        // SaveAsyncメソッドでentityがTodosテーブルへ追加
                        var insertTask = dbContext.SaveAsync(entity);
                        insertTask.Wait();
                        break;

                    case "Select":
                        // LoadAsyncメソッドでパーティションキーとソートキーが一致するレコードを取得
                        var selectTask = dbContext.LoadAsync<TodoEntity>(hashKey: entity.Id, rangeKey: entity.Due);
                        selectTask.Wait();
                        context.Logger.LogLine(JsonConvert.SerializeObject(selectTask.Result));
                        break;

                    case "Update":
                        // 更新対象のレコードをLoadAsyncで取得して、
                        var updateTask1 = dbContext.LoadAsync<TodoEntity>(hashKey: entity.Id, rangeKey: entity.Due);
                        updateTask1.Wait();
                        var updatedEntity = updateTask1.Result;

                        // 更新後のレコードをSaveAsyncでテーブルに追加する
                        // (すなわち、パーティションキーとソートキーが一致する場合は上書きされる)
                        updatedEntity.Contents = entity.Contents ?? updatedEntity.Contents;
                        updatedEntity.DoneFlag = entity.DoneFlag ?? updatedEntity.DoneFlag;
                        var updateTask2 = dbContext.SaveAsync(entity);
                        updateTask2.Wait();
                        break;

                    case "Delete":
                        // DeleteAsyncメソッドでパーティションキーとソートキーが一致するレコードを削除
                        var deleteTask = dbContext.DeleteAsync<TodoEntity>(hashKey: entity.Id, rangeKey: entity.Due);
                        deleteTask.Wait();
                        break;
                }
            }
        }
    }
}

動作確認

それではAWS Explorerからデプロイして動作確認します。

Insertでは以下のような引数用のJSONを作成し、

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "DoneFlag": false,
    "Contents": [
        "abcde",
        "12345"
    ],
    "Operation": "Insert"
}

AWS Explorerから実行してみます。 f:id:ohke:20170106235017j:plain

そしてAWS ConsoleからDynamoDBのTodosテーブルを開くと、Idが"001"のレコードが作成されていることがわかるかと思います。

  • Contentsに複数項目、またOperationが無いことも確認してください。

f:id:ohke:20170106235538j:plain:w450

同じようにSelectも以下のような引数用のJSONを作成し、

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "Operation": "Select"
}

AWS Explorerから実行すると参照できていることも確認できるかと思います。
f:id:ohke:20170106235837j:plain:w450

UpdateではDoneFlagとContentsを変更した引数を渡しており、

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "DoneFlag": true,
    "Contents": [
        "ABCDE",
        "54321"
    ],
    "Operation": "Update"
}

それぞれ更新されていることが確認できます。 f:id:ohke:20170107000028j:plain:w450

Deleteもパーティションキーとソートキーを渡すだけです。

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "Operation": "Delete"
}

まとめ

今回はDynamoDBTable属性とDynamoDBContextを使ってDynamoDBを操作しましたが、ScanやQuery、BatchWrite、BatchGetなども提供されています。
これ以外にもドキュメントモデルや低レベルAPIを使って操作する方法もありますので、今後紹介できればと思います。

AWS Kinesis StreamへPut/GetするLambdaをC#で書いてみる

今回はKinesis StreamへPut/GetするLambdaをC#で書いてみます。
データフローとしてはLambda→Kinesis Stream→Lambdaとなります。

Kinesis Streamの作成

Kinesis Streamは、デバイス情報やログなどの継続して生成され続けるデータを効率的に処理するためのマネージドサービスです。
シャード(shard)と呼ばれるスループット単位で入出力され、データの保持期間は24時間という制限があります。

今回は1シャードのみのKinesis Streamを"test"という名前で作成します。

f:id:ohke:20161230154449j:plain

なお、Kinesisは無料枠から外れ、一切の利用がなかった場合でもシャード数に応じて時間料金が発生します。 詳しくはこちらでご確認ください。

Lambdaの実装

まずはKinesis Streamから入出力するデータのクラス(単純なPOCO)を定義します。

public class KinesisRecord
{
    public string StringValue { get; set; }
    public int IntValue { get; set; }
    public double DoubleValue { get; set; }
    public DateTime DateTimeValue { get; set; }
    public PocoClass PocoValue { get; set; }
}

public class PocoClass
{
    public string InnerValue { get; set; }
}

次にPutする側のLambdaとして、KinesisPutFunctionを実装・アップロードします。
PutRecordRequestを作成して、AmazonKinesisClientでPutさせます。

  • データの一部を保持するKinesisPutHandlerInputクラスのオブジェクトを引数としています。
    • JSONリアライザ属性を付与しているので、JSON形式で入出力されます。
  • AmazonKinesisClientなどを使うため、AWSSDKをインストールする必要があります。
  • PutRecordRequestのPartitionKeyは入出力するシャードの選択に使われます。
    • 今回は1シャードのみなので固定値にしています。
    • 複数のシャードを使う場合は、偏らせないためにカーディナリティの高い値を設定する必要があります。
using System;
using System.IO;
using System.Text;
using Amazon;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisPutFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public PutRecordResponse KinesisPutHandler(KinesisPutHandlerInput input, ILambdaContext context)
        {
            // Kinesis StreamへPUTしたいオブジェクト
            var record = new KinesisRecord
            {
                StringValue = input.StringValue,
                IntValue = input.IntValue,
                DoubleValue = input.DoubleValue,
                DateTimeValue = DateTime.Now,
                PocoValue = new PocoClass { InnerValue = "Inner string."}
            };

            var request = new PutRecordRequest
            {
                // 先ほど作成したKinesis Streamの名前
                StreamName = "test",
                // 1シャードのため固定値
                PartitionKey = "test-partitionkey-0",
                // JSON形式でシリアライズ 
                Data = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(record)))
            };

            var client = new AmazonKinesisClient(RegionEndpoint.USWest2);
            // PUTは非同期のみ
            var result = client.PutRecordAsync(request);
            result.Wait();

            // PutRecordResponseオブジェクトをそのままJSON形式で返す
            return result.Result;
        }
    }

    public class KinesisPutHandlerInput
    {
        public string StringValue { get; set; }
        public int IntValue { get; set; }
        public double DoubleValue { get; set; }
    }
}

最後にGetする側のLambdaとして、KinesisGetFunctionを実装・アップロードします。
今回はKinesis StreamへのPutをトリガーとして実行させます。

  • Kinesis Streamをトリガとする場合、KinesisEventが引数として渡されます。
    • KinesisEventからPutされたレコードを取り出します。
using System.IO;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisGetFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public void KinesisGetHandler(KinesisEvent kinesisEvent, ILambdaContext context)
        {
            // 今回は1レコード単位でトリガーされるので、1となる
            context.Logger.LogLine($"Records count: {kinesisEvent.Records.Count}");

            foreach (var record in kinesisEvent.Records)
            {
                // MemoryStreamオブジェクトのため、JSON文字列に変換してからデシリアライズ
                var jsonString = new StreamReader(record.Kinesis.Data).ReadToEnd();
                context.Logger.Log(jsonString);

                var data = JsonConvert.DeserializeObject<KinesisRecord>(jsonString);
                context.Logger.Log($"{data.StringValue}, " +
                                   $"{data.IntValue}, " +
                                   $"{data.DoubleValue}, " +
                                   $"{data.DateTimeValue}," +
                                   $"{data.PocoValue.InnerValue}");
            }
        }
    }
}

Kinesis Streamをポーリングさせるために、KinesisGetFunctionのアップロード後にAWSコンソールからトリガの設定を行います。

  • Batch sizeはLambdaの呼び出しごとに読み込むレコード数を指定します。
    • 1レコードずつ取り出したいので、"1"で設定
  • Starting positionはどのレコードの読み込みから開始すべきかを指定します。
    • Kinesis Streamが空の状態からポーリングを開始するので、"Latest"で設定
      f:id:ohke:20161230154721j:plain:w350

テスト

それではKinesisPutFunctionから起動してKinesisStreamへレコードをPutさせ、トリガで起動したKinesisGetFunctionからレコードをGetさせる、一連の流れをテストしてみます。

AWSコンソールからも可能ですが、今回はVisual StudioAWS Explorer(メニューの[表示]>[AWS Explorer])から実行してみます。
作成したKinesisPutFunctionを選択して[Test Function]タブを開き、リクエストをJSONで記述して、Invokeで実行します。

  • シーケンス番号やシャードIDが返ってきているため、Putに成功しています。

f:id:ohke:20161230155436j:plain

次にAWS ExplorerからKinesisGetFunctionを選択して[Logs]タブを開き、ログファイルをダウンロードします。
f:id:ohke:20161230155759j:plain

Putしたレコードの値が取り出せていることが確認できるかと思います。

2016-23-30 14:23:21: START RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4 Version: $LATEST
2016-23-30 14:23:26: Records count: 1
2016-23-30 14:23:26: {"StringValue":"Hello, World!","IntValue":1,"DoubleValue":2.5,"DateTimeValue":"2016-12-30T05:23:15.010974+00:00","PocoValue":{"InnerValue":"Inner string."}}
2016-23-30 14:23:26: Hello, World!, 1, 2.5, 12/30/16 5:23:15 AM,Inner string.2016-23-30 14:23:26: END RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4
2016-23-30 14:23:26: REPORT RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4 Duration: 5008.18 ms    Billed Duration: 5100 ms    Memory Size: 128 MB Max Memory Used: 29 MB  

AWSコンソールからも無事に出力されています。
f:id:ohke:20161230155639j:plain

まとめ

トランザクションと永続化を前提とするRDBなどと比較すると、Kinesis Streamの考え方・使い方は一種独特で、慣れが必要だと思います。
シャードやシーケンス番号を考慮するともっと複雑になってきますので、まずは最初の足がかりとして1レコードのPut/Getを作ってみました。

AWS API GatewayでフックされるLambdaをC#で書いてみる

最近仕事でAWSに触っています。
また、12/1にAWSのLambdaがC#に対応したこともあり、Lambdaで色々遊んでます。

AWS Lambda Supports C#

今回の投稿では、HTTPリクエストを受けたAPI GatewayがLambdaをフックしてレスポンスを返す、ということをやってみます。

準備

AWS Toolkitを使い、.NET Coreアプリケーションとして開発します。 いくつかの投稿で詳しく説明されています。

qiita.com

blog.shibayan.jp

Lambda

AWS Lambda Projectを作成し、Lambda Functionを実装します。
ここではParamクラスをリクエストとして受け取り、そのままレスポンスとして返すだけの簡単なLambdaを定義します。

  • JSONでリクエスト・レスポンスするので、シリアライゼーションにAmazon.Lambda.Serialization.Json.JsonSerializerを使います。
  • 何も実装・継承していない素のクラスで、Lambdaで呼び出すハンドラ(この場合LambdaTestHandler)をPublish時に指定します。
using Amazon.Lambda.Core;

namespace LambdaTest
{
    public class LambdaTestFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public Param LambdaTestHandler(Param input, ILambdaContext context)
        {
            return input;
        }

        public class Param
        {
            public string PathValue { get; set; }
            public string QueryValue { get; set; }
            public string HeaderValue { get; set; }
        }
    }
}

API Gateway

次にコンソール画面から、"test-api"というAPIを作成していきます。
test-apiでは、GETメソッドを定義し、パスパラメータ・クエリパラメータ・リクエストヘッダの値をJSON形式にしてLambda(先程作成したLambdaTestFunction)へ渡します。 f:id:ohke:20161229092432j:plain

  • パスパラメータはブラケットでくくられたリソース(ここでは、/{pathparam})として定義します。
    f:id:ohke:20161229092606j:plain

  • メソッドリクエストではLmabdaの入力値として使用したいパラメータを定義します。

    • pathparam、queryparam、headerparamとします。 f:id:ohke:20161229092625j:plain:w350
  • 統合リクエストではJSON形式でLambdaへ渡るようにマッピングを定義します。

    • Content-Typeはapplication/jsonとします。
    • テンプレートでは、$input.params('メソッドリクエストで定義した名前')でパラメータへアクセスできます。
{
    "PathValue": "$input.params('pathparam')",
    "QueryValue": "$input.params('queryparam')",
    "HeaderValue": "$input.params('headerparam')"
}
  • レスポンスは何も設定しません。
    • Lambda側でJSONリアライザを指定しているので、レスポンスクラス(ここではParam)がJSON形式で返されます。

最後にテストをすると、JSONの値が返ってくるようになります。 f:id:ohke:20161229092738j:plain:w400

まとめ

簡単なJSONエコー?ではありますが、EC2などのサーバを一切使わずに、Web APIを提供できるようになりました。