AWS Lambda(C#)の初回実行が遅いようなので調べました

最近の投稿でC#で書いたプログラムをLambdaで実行させていますが、初回実行が2回目以降と比較して明らかに遅いようなので簡単に調査しました。

構成

PCからJMeterAPI Gatewayへリクエストし、API GatewayがLambdaをキックさせます。

  • Lambda自体は定数文字列をJSON形式で返す以外に何もさせないが、ログストリームからLambdaの実行時間(Duration)をサンプリング
    • Cloudwatchのログストリームの取得では下記投稿のプログラムを使わせていただきました

qiita.com

using Amazon.Lambda.Core;

namespace AWSLambda
{
    public class Function
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public string FunctionHandler(ILambdaContext context)
        {
            return "Hello, World!";
        }
    }
}
  • JMeterからは10スレッドから各10回のリクエスト(合計100リクエスト)を送信して遅延時間(elapsed)をサンプリング
    • Ramp-Upが0秒(全スレッドが同時起動)のため、各スレッドの最初のリクエストでLambda10個が同時に実行開始される
  • PCからAPI GatewayまでのRTTは100ms程度

結果

Lambdaの割当メモリを128MB、256MB、512MBの3パターンで計測してみた結果が、以下の表です。

128MB 256MB 512MB
1回目のアクセス時の平均遅延時間(ms) 5864 4312 2706
2回目以降のアクセス時の平均遅延時間(ms) 1631 413 431
1回目の平均Lambda実行時間(ms) 1869 986 467
2回目以降の平均Lambda実行時間(ms) 110 1.45 0.92
  • 1回目のアクセスが2回目以降のアクセスよりも明らかに時間を要し、メモリ512MBで平均2.7秒
    • 単純な処理時間の増大に加えて、滞留するリクエストの増大によるLambdaの同時実行数の増加も考慮する必要があります
      • 例えば100リクエスト/秒の場合、0.0秒~1.0秒の100リクエストで100個のLambdaが初回実行します
      • 初回実行のため2.7秒を要すると、1.0秒~2.7秒の間に受けたリクエストはLambdaの同時実行数の上限(通常100)に達して500エラーが返されます
  • 初回実行の場合も割当メモリを増やすことで実行時間は改善されるようです
    • 特に実行時間はメモリ割当に対してほぼ反比例しています
    • AWS公式でもメモリと比例したコンピューティングリソースの割当を明言されています

      AWS Lambda のリソースモデルでは、お客様が関数に必要なメモリ量を指定するとそれに比例した CPU パワーとその他のリソースが割り当てられます。
      たとえば、256 MB のメモリを指定すると約 2 倍の CPU パワーが Lambda 関数に割り当てられます。
      128 MB のメモリを指定した場合と比較すると CPU パワーは倍となり、512 MB のメモリを指定した場合と比較すると半分になります。

まとめ

Lambda単体での遅延時間・処理時間を計測することで、2回目以降と比較して初回実行に時間を要することを確認しました。
今回はプログラムをデプロイし直すことで初回実行を再現させていましたが、ある程度時間が経ってから実行すると初回実行と同等の時間を要するようになりますので、設計時に頭に入れておいたほうが良さそうです。

AWS DynamoDBをC#でCRUDする

今回はDynamoDBへのCRUD操作を、C#で書いて、Lambdaで動かしてみたいと思います。

テーブルの作成

まずはAWS ConsoleからDynamoDBへテーブルを予め作成します。

  • テーブル名を"Todos"とします。
  • パーティションキーを"Id"、ソートキーを"Due"とします。 f:id:ohke:20170106235004j:plain:w450

エンティティクラスの作成

AWS Lambdaプロジェクトを作成し、今回はDynamoDBTable属性を付与したエンティティクラスを使います。

  • AWS Lambdaプロジェクトの作成とデプロイは下記の投稿に詳しく記載されています。

qiita.com

  • DynamoDBTable属性が付与することでTodoEntityクラスがテーブル("Todos")へマッピングされます。
using System.Collections.Generic;
using Amazon.DynamoDBv2.DataModel;

namespace AWSLambda
{
    [DynamoDBTable("Todos")]
    public class TodoEntity
    {
        // パーティションキー(ハッシュキー)
        [DynamoDBHashKey]
        public string Id { get; set; }

        // ソートキー(レンジキー)
        [DynamoDBRangeKey]
        public string Due { get; set; }

        // テーブルでは項目名"Done"にマッピングされる
        [DynamoDBProperty("Done")]
        public bool? DoneFlag { get; set; }

        // List<T>とすることで複数項目にマッピングされる
        public List<string> Contents { get; set; }

        // テーブルへマッピングされない
        [DynamoDBIgnore]
        public string Operation { get; set; }
    }
}

Lambdaの作成

それではLambdaを作成します。

  • TodosテーブルへマッピングされたTodoEntityオブジェクトを介すことで、DynamoDBへ接続されたDynamoDBContextを使って読み書きします。
  • TodoEntityクラスを引数として、Operationの値によってInsert/Select/Update/Deleteの処理を分岐しています。
  • いずれの処理もパーティションキーとソートキーの指定が必要です。
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class DynamoDbOperationFunction
    {
        private static readonly AmazonDynamoDBClient Client = new AmazonDynamoDBClient(RegionEndpoint.USWest2);

        // JSONをデシリアライズしてTodoEntityオブジェクトを得る
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public void DynamoDbOperationHandler(TodoEntity entity, ILambdaContext context)
        {
            context.Logger.LogLine(JsonConvert.SerializeObject(entity));

            // 最初にDynamoDBContextを生成する
            using (var dbContext = new DynamoDBContext(Client))
            {
                switch (entity.Operation)
                {
                    case "Insert":
                        // SaveAsyncメソッドでentityがTodosテーブルへ追加
                        var insertTask = dbContext.SaveAsync(entity);
                        insertTask.Wait();
                        break;

                    case "Select":
                        // LoadAsyncメソッドでパーティションキーとソートキーが一致するレコードを取得
                        var selectTask = dbContext.LoadAsync<TodoEntity>(hashKey: entity.Id, rangeKey: entity.Due);
                        selectTask.Wait();
                        context.Logger.LogLine(JsonConvert.SerializeObject(selectTask.Result));
                        break;

                    case "Update":
                        // 更新対象のレコードをLoadAsyncで取得して、
                        var updateTask1 = dbContext.LoadAsync<TodoEntity>(hashKey: entity.Id, rangeKey: entity.Due);
                        updateTask1.Wait();
                        var updatedEntity = updateTask1.Result;

                        // 更新後のレコードをSaveAsyncでテーブルに追加する
                        // (すなわち、パーティションキーとソートキーが一致する場合は上書きされる)
                        updatedEntity.Contents = entity.Contents ?? updatedEntity.Contents;
                        updatedEntity.DoneFlag = entity.DoneFlag ?? updatedEntity.DoneFlag;
                        var updateTask2 = dbContext.SaveAsync(entity);
                        updateTask2.Wait();
                        break;

                    case "Delete":
                        // DeleteAsyncメソッドでパーティションキーとソートキーが一致するレコードを削除
                        var deleteTask = dbContext.DeleteAsync<TodoEntity>(hashKey: entity.Id, rangeKey: entity.Due);
                        deleteTask.Wait();
                        break;
                }
            }
        }
    }
}

動作確認

それではAWS Explorerからデプロイして動作確認します。

Insertでは以下のような引数用のJSONを作成し、

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "DoneFlag": false,
    "Contents": [
        "abcde",
        "12345"
    ],
    "Operation": "Insert"
}

AWS Explorerから実行してみます。 f:id:ohke:20170106235017j:plain

そしてAWS ConsoleからDynamoDBのTodosテーブルを開くと、Idが"001"のレコードが作成されていることがわかるかと思います。

  • Contentsに複数項目、またOperationが無いことも確認してください。

f:id:ohke:20170106235538j:plain:w450

同じようにSelectも以下のような引数用のJSONを作成し、

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "Operation": "Select"
}

AWS Explorerから実行すると参照できていることも確認できるかと思います。
f:id:ohke:20170106235837j:plain:w450

UpdateではDoneFlagとContentsを変更した引数を渡しており、

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "DoneFlag": true,
    "Contents": [
        "ABCDE",
        "54321"
    ],
    "Operation": "Update"
}

それぞれ更新されていることが確認できます。 f:id:ohke:20170107000028j:plain:w450

Deleteもパーティションキーとソートキーを渡すだけです。

{
    "Id": "001",
    "Due": "2017-01-10T10:00:00",
    "Operation": "Delete"
}

まとめ

今回はDynamoDBTable属性とDynamoDBContextを使ってDynamoDBを操作しましたが、ScanやQuery、BatchWrite、BatchGetなども提供されています。
これ以外にもドキュメントモデルや低レベルAPIを使って操作する方法もありますので、今後紹介できればと思います。

AWS Kinesis StreamへPut/GetするLambdaをC#で書いてみる

今回はKinesis StreamへPut/GetするLambdaをC#で書いてみます。
データフローとしてはLambda→Kinesis Stream→Lambdaとなります。

Kinesis Streamの作成

Kinesis Streamは、デバイス情報やログなどの継続して生成され続けるデータを効率的に処理するためのマネージドサービスです。
シャード(shard)と呼ばれるスループット単位で入出力され、データの保持期間は24時間という制限があります。

今回は1シャードのみのKinesis Streamを"test"という名前で作成します。

f:id:ohke:20161230154449j:plain

なお、Kinesisは無料枠から外れ、一切の利用がなかった場合でもシャード数に応じて時間料金が発生します。 詳しくはこちらでご確認ください。

Lambdaの実装

まずはKinesis Streamから入出力するデータのクラス(単純なPOCO)を定義します。

public class KinesisRecord
{
    public string StringValue { get; set; }
    public int IntValue { get; set; }
    public double DoubleValue { get; set; }
    public DateTime DateTimeValue { get; set; }
    public PocoClass PocoValue { get; set; }
}

public class PocoClass
{
    public string InnerValue { get; set; }
}

次にPutする側のLambdaとして、KinesisPutFunctionを実装・アップロードします。
PutRecordRequestを作成して、AmazonKinesisClientでPutさせます。

  • データの一部を保持するKinesisPutHandlerInputクラスのオブジェクトを引数としています。
    • JSONリアライザ属性を付与しているので、JSON形式で入出力されます。
  • AmazonKinesisClientなどを使うため、AWSSDKをインストールする必要があります。
  • PutRecordRequestのPartitionKeyは入出力するシャードの選択に使われます。
    • 今回は1シャードのみなので固定値にしています。
    • 複数のシャードを使う場合は、偏らせないためにカーディナリティの高い値を設定する必要があります。
using System;
using System.IO;
using System.Text;
using Amazon;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisPutFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public PutRecordResponse KinesisPutHandler(KinesisPutHandlerInput input, ILambdaContext context)
        {
            // Kinesis StreamへPUTしたいオブジェクト
            var record = new KinesisRecord
            {
                StringValue = input.StringValue,
                IntValue = input.IntValue,
                DoubleValue = input.DoubleValue,
                DateTimeValue = DateTime.Now,
                PocoValue = new PocoClass { InnerValue = "Inner string."}
            };

            var request = new PutRecordRequest
            {
                // 先ほど作成したKinesis Streamの名前
                StreamName = "test",
                // 1シャードのため固定値
                PartitionKey = "test-partitionkey-0",
                // JSON形式でシリアライズ 
                Data = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(record)))
            };

            var client = new AmazonKinesisClient(RegionEndpoint.USWest2);
            // PUTは非同期のみ
            var result = client.PutRecordAsync(request);
            result.Wait();

            // PutRecordResponseオブジェクトをそのままJSON形式で返す
            return result.Result;
        }
    }

    public class KinesisPutHandlerInput
    {
        public string StringValue { get; set; }
        public int IntValue { get; set; }
        public double DoubleValue { get; set; }
    }
}

最後にGetする側のLambdaとして、KinesisGetFunctionを実装・アップロードします。
今回はKinesis StreamへのPutをトリガーとして実行させます。

  • Kinesis Streamをトリガとする場合、KinesisEventが引数として渡されます。
    • KinesisEventからPutされたレコードを取り出します。
using System.IO;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisGetFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public void KinesisGetHandler(KinesisEvent kinesisEvent, ILambdaContext context)
        {
            // 今回は1レコード単位でトリガーされるので、1となる
            context.Logger.LogLine($"Records count: {kinesisEvent.Records.Count}");

            foreach (var record in kinesisEvent.Records)
            {
                // MemoryStreamオブジェクトのため、JSON文字列に変換してからデシリアライズ
                var jsonString = new StreamReader(record.Kinesis.Data).ReadToEnd();
                context.Logger.Log(jsonString);

                var data = JsonConvert.DeserializeObject<KinesisRecord>(jsonString);
                context.Logger.Log($"{data.StringValue}, " +
                                   $"{data.IntValue}, " +
                                   $"{data.DoubleValue}, " +
                                   $"{data.DateTimeValue}," +
                                   $"{data.PocoValue.InnerValue}");
            }
        }
    }
}

Kinesis Streamをポーリングさせるために、KinesisGetFunctionのアップロード後にAWSコンソールからトリガの設定を行います。

  • Batch sizeはLambdaの呼び出しごとに読み込むレコード数を指定します。
    • 1レコードずつ取り出したいので、"1"で設定
  • Starting positionはどのレコードの読み込みから開始すべきかを指定します。
    • Kinesis Streamが空の状態からポーリングを開始するので、"Latest"で設定
      f:id:ohke:20161230154721j:plain:w350

テスト

それではKinesisPutFunctionから起動してKinesisStreamへレコードをPutさせ、トリガで起動したKinesisGetFunctionからレコードをGetさせる、一連の流れをテストしてみます。

AWSコンソールからも可能ですが、今回はVisual StudioAWS Explorer(メニューの[表示]>[AWS Explorer])から実行してみます。
作成したKinesisPutFunctionを選択して[Test Function]タブを開き、リクエストをJSONで記述して、Invokeで実行します。

  • シーケンス番号やシャードIDが返ってきているため、Putに成功しています。

f:id:ohke:20161230155436j:plain

次にAWS ExplorerからKinesisGetFunctionを選択して[Logs]タブを開き、ログファイルをダウンロードします。
f:id:ohke:20161230155759j:plain

Putしたレコードの値が取り出せていることが確認できるかと思います。

2016-23-30 14:23:21: START RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4 Version: $LATEST
2016-23-30 14:23:26: Records count: 1
2016-23-30 14:23:26: {"StringValue":"Hello, World!","IntValue":1,"DoubleValue":2.5,"DateTimeValue":"2016-12-30T05:23:15.010974+00:00","PocoValue":{"InnerValue":"Inner string."}}
2016-23-30 14:23:26: Hello, World!, 1, 2.5, 12/30/16 5:23:15 AM,Inner string.2016-23-30 14:23:26: END RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4
2016-23-30 14:23:26: REPORT RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4 Duration: 5008.18 ms    Billed Duration: 5100 ms    Memory Size: 128 MB Max Memory Used: 29 MB  

AWSコンソールからも無事に出力されています。
f:id:ohke:20161230155639j:plain

まとめ

トランザクションと永続化を前提とするRDBなどと比較すると、Kinesis Streamの考え方・使い方は一種独特で、慣れが必要だと思います。
シャードやシーケンス番号を考慮するともっと複雑になってきますので、まずは最初の足がかりとして1レコードのPut/Getを作ってみました。