AWS Kinesis StreamへPut/GetするLambdaをC#で書いてみる

今回はKinesis StreamへPut/GetするLambdaをC#で書いてみます。
データフローとしてはLambda→Kinesis Stream→Lambdaとなります。

Kinesis Streamの作成

Kinesis Streamは、デバイス情報やログなどの継続して生成され続けるデータを効率的に処理するためのマネージドサービスです。
シャード(shard)と呼ばれるスループット単位で入出力され、データの保持期間は24時間という制限があります。

今回は1シャードのみのKinesis Streamを"test"という名前で作成します。

f:id:ohke:20161230154449j:plain

なお、Kinesisは無料枠から外れ、一切の利用がなかった場合でもシャード数に応じて時間料金が発生します。 詳しくはこちらでご確認ください。

Lambdaの実装

まずはKinesis Streamから入出力するデータのクラス(単純なPOCO)を定義します。

public class KinesisRecord
{
    public string StringValue { get; set; }
    public int IntValue { get; set; }
    public double DoubleValue { get; set; }
    public DateTime DateTimeValue { get; set; }
    public PocoClass PocoValue { get; set; }
}

public class PocoClass
{
    public string InnerValue { get; set; }
}

次にPutする側のLambdaとして、KinesisPutFunctionを実装・アップロードします。
PutRecordRequestを作成して、AmazonKinesisClientでPutさせます。

  • データの一部を保持するKinesisPutHandlerInputクラスのオブジェクトを引数としています。
    • JSONリアライザ属性を付与しているので、JSON形式で入出力されます。
  • AmazonKinesisClientなどを使うため、AWSSDKをインストールする必要があります。
  • PutRecordRequestのPartitionKeyは入出力するシャードの選択に使われます。
    • 今回は1シャードのみなので固定値にしています。
    • 複数のシャードを使う場合は、偏らせないためにカーディナリティの高い値を設定する必要があります。
using System;
using System.IO;
using System.Text;
using Amazon;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisPutFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public PutRecordResponse KinesisPutHandler(KinesisPutHandlerInput input, ILambdaContext context)
        {
            // Kinesis StreamへPUTしたいオブジェクト
            var record = new KinesisRecord
            {
                StringValue = input.StringValue,
                IntValue = input.IntValue,
                DoubleValue = input.DoubleValue,
                DateTimeValue = DateTime.Now,
                PocoValue = new PocoClass { InnerValue = "Inner string."}
            };

            var request = new PutRecordRequest
            {
                // 先ほど作成したKinesis Streamの名前
                StreamName = "test",
                // 1シャードのため固定値
                PartitionKey = "test-partitionkey-0",
                // JSON形式でシリアライズ 
                Data = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(record)))
            };

            var client = new AmazonKinesisClient(RegionEndpoint.USWest2);
            // PUTは非同期のみ
            var result = client.PutRecordAsync(request);
            result.Wait();

            // PutRecordResponseオブジェクトをそのままJSON形式で返す
            return result.Result;
        }
    }

    public class KinesisPutHandlerInput
    {
        public string StringValue { get; set; }
        public int IntValue { get; set; }
        public double DoubleValue { get; set; }
    }
}

最後にGetする側のLambdaとして、KinesisGetFunctionを実装・アップロードします。
今回はKinesis StreamへのPutをトリガーとして実行させます。

  • Kinesis Streamをトリガとする場合、KinesisEventが引数として渡されます。
    • KinesisEventからPutされたレコードを取り出します。
using System.IO;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisGetFunction
    {
        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public void KinesisGetHandler(KinesisEvent kinesisEvent, ILambdaContext context)
        {
            // 今回は1レコード単位でトリガーされるので、1となる
            context.Logger.LogLine($"Records count: {kinesisEvent.Records.Count}");

            foreach (var record in kinesisEvent.Records)
            {
                // MemoryStreamオブジェクトのため、JSON文字列に変換してからデシリアライズ
                var jsonString = new StreamReader(record.Kinesis.Data).ReadToEnd();
                context.Logger.Log(jsonString);

                var data = JsonConvert.DeserializeObject<KinesisRecord>(jsonString);
                context.Logger.Log($"{data.StringValue}, " +
                                   $"{data.IntValue}, " +
                                   $"{data.DoubleValue}, " +
                                   $"{data.DateTimeValue}," +
                                   $"{data.PocoValue.InnerValue}");
            }
        }
    }
}

Kinesis Streamをポーリングさせるために、KinesisGetFunctionのアップロード後にAWSコンソールからトリガの設定を行います。

  • Batch sizeはLambdaの呼び出しごとに読み込むレコード数を指定します。
    • 1レコードずつ取り出したいので、"1"で設定
  • Starting positionはどのレコードの読み込みから開始すべきかを指定します。
    • Kinesis Streamが空の状態からポーリングを開始するので、"Latest"で設定
      f:id:ohke:20161230154721j:plain:w350

テスト

それではKinesisPutFunctionから起動してKinesisStreamへレコードをPutさせ、トリガで起動したKinesisGetFunctionからレコードをGetさせる、一連の流れをテストしてみます。

AWSコンソールからも可能ですが、今回はVisual StudioAWS Explorer(メニューの[表示]>[AWS Explorer])から実行してみます。
作成したKinesisPutFunctionを選択して[Test Function]タブを開き、リクエストをJSONで記述して、Invokeで実行します。

  • シーケンス番号やシャードIDが返ってきているため、Putに成功しています。

f:id:ohke:20161230155436j:plain

次にAWS ExplorerからKinesisGetFunctionを選択して[Logs]タブを開き、ログファイルをダウンロードします。
f:id:ohke:20161230155759j:plain

Putしたレコードの値が取り出せていることが確認できるかと思います。

2016-23-30 14:23:21: START RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4 Version: $LATEST
2016-23-30 14:23:26: Records count: 1
2016-23-30 14:23:26: {"StringValue":"Hello, World!","IntValue":1,"DoubleValue":2.5,"DateTimeValue":"2016-12-30T05:23:15.010974+00:00","PocoValue":{"InnerValue":"Inner string."}}
2016-23-30 14:23:26: Hello, World!, 1, 2.5, 12/30/16 5:23:15 AM,Inner string.2016-23-30 14:23:26: END RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4
2016-23-30 14:23:26: REPORT RequestId: 221a6c3d-52a1-4c26-b8c1-f3fde43a4cd4 Duration: 5008.18 ms    Billed Duration: 5100 ms    Memory Size: 128 MB Max Memory Used: 29 MB  

AWSコンソールからも無事に出力されています。
f:id:ohke:20161230155639j:plain

まとめ

トランザクションと永続化を前提とするRDBなどと比較すると、Kinesis Streamの考え方・使い方は一種独特で、慣れが必要だと思います。
シャードやシーケンス番号を考慮するともっと複雑になってきますので、まずは最初の足がかりとして1レコードのPut/Getを作ってみました。