Kinesis Firehoseを使ってLambda(C#)からS3にファイル出力する

今回はKinesis Firehoseを使って、Lambdaから受け取ったレコードをS3に出力させてみます。

Kinesis Firehoseの作成

Kinesis Firehoseは受信したレコードをプログラミングゼロでS3/Redshift/Elasticsearchへ流し込むことができるストリームサービスで、現在は米国リージョンでのみの提供されています。

Amazon Kinesis Firehose(ストリーミングデータを AWS へ簡単にロード) | AWS

今回は、S3(delivery-s3バケット)に出力するKinesis Firehose(delivery-stream)を作成します。 - 60秒間隔(または5KB単位)でS3へ出力され、それまではFirehose内でバッファリングされます - GZIP形式で出力します

f:id:ohke:20170331085712j:plain:w400

Lambdaの実装

次に作成したFirehoseへレコードをPUTするLambdaを実装します。引数をJSON文字列にして、そのままFirehoseへPUTしています。 - Amazon.KinesisFirehoseを使うため、project.jsonではdependenciesにAWSSDK.KinesisFirehoseを追加してください

using System.IO;
using System.Net;
using System.Text;
using Amazon;
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisFirehosePutFunction
    {
        private AmazonKinesisFirehoseClient _firehoseClient = new AmazonKinesisFirehoseClient(RegionEndpoint.USWest2);
        private string _firehoseName = "delivery-stream";

        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public async System.Threading.Tasks.Task<string> KinesisFirehosePutHandler(KinesisFirehoseHandlerInput input, ILambdaContext context)
        {
            var data = JsonConvert.SerializeObject(input);

            // Amazon.KinesisFirehose.Model.RecordインスタンスのDataプロパティにPUTするデータを詰める
            var record = new Record()
            {
                Data = new MemoryStream(Encoding.UTF8.GetBytes(data))
            };

            // Kinesis Firehose(ストリーム名:delivery-stream)へ作成したRecordインスタンスをPUTする
            var response = await _firehoseClient.PutRecordAsync(_firehoseName, record);

            if (response.HttpStatusCode == HttpStatusCode.OK)
            {
                return $"RecordId: {response.RecordId}";
            }
            else
            {
                return "Error";
            }
        }
    }

    public class KinesisFirehoseHandlerInput
    {
        public string StringValue { get; set; }
        public int IntValue { get; set; }
        public double DoubleValue { get; set; }
    }
}

Lambdaを実行するロールには、firehose:PutRecordの許可権限を付与します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "firehose:PutRecord"
      ],
      "Resource": "arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/delivery-stream"
    }
  ]
}

Lambdaに適当な引数を与えて実行すると、指定したバケットのyyyy/mm/dd/hhフォルダ配下にストリーム名-バージョン-タイムスタンプ-GUID.gzのファイルが作成されます。

f:id:ohke:20170331090148j:plain

{"StringValue":"text1","IntValue":10,"DoubleValue":2.5}

また、インターバルを1分で指定しており、例えば数秒間で2レコードを受信した場合は、以下のように1ファイルへマージされてS3へ出力されます。

{"StringValue":"text2","IntValue":10,"DoubleValue":2.5}{"StringValue":"text3","IntValue":10,"DoubleValue":2.5}