今回はKinesis Firehoseを使って、Lambdaから受け取ったレコードをS3に出力させてみます。
Kinesis Firehoseの作成
Kinesis Firehoseは受信したレコードをプログラミングゼロでS3/Redshift/Elasticsearchへ流し込むことができるストリームサービスで、現在は米国リージョンでのみの提供されています。
Amazon Kinesis Firehose(ストリーミングデータを AWS へ簡単にロード) | AWS
今回は、S3(delivery-s3バケット)に出力するKinesis Firehose(delivery-stream)を作成します。 - 60秒間隔(または5KB単位)でS3へ出力され、それまではFirehose内でバッファリングされます - GZIP形式で出力します
Lambdaの実装
次に作成したFirehoseへレコードをPUTするLambdaを実装します。引数をJSON文字列にして、そのままFirehoseへPUTしています。
- Amazon.KinesisFirehoseを使うため、project.jsonではdependenciesにAWSSDK.KinesisFirehose
を追加してください
using System.IO; using System.Net; using System.Text; using Amazon; using Amazon.KinesisFirehose; using Amazon.KinesisFirehose.Model; using Amazon.Lambda.Core; using Newtonsoft.Json; namespace AWSLambda { public class KinesisFirehosePutFunction { private AmazonKinesisFirehoseClient _firehoseClient = new AmazonKinesisFirehoseClient(RegionEndpoint.USWest2); private string _firehoseName = "delivery-stream"; [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))] public async System.Threading.Tasks.Task<string> KinesisFirehosePutHandler(KinesisFirehoseHandlerInput input, ILambdaContext context) { var data = JsonConvert.SerializeObject(input); // Amazon.KinesisFirehose.Model.RecordインスタンスのDataプロパティにPUTするデータを詰める var record = new Record() { Data = new MemoryStream(Encoding.UTF8.GetBytes(data)) }; // Kinesis Firehose(ストリーム名:delivery-stream)へ作成したRecordインスタンスをPUTする var response = await _firehoseClient.PutRecordAsync(_firehoseName, record); if (response.HttpStatusCode == HttpStatusCode.OK) { return $"RecordId: {response.RecordId}"; } else { return "Error"; } } } public class KinesisFirehoseHandlerInput { public string StringValue { get; set; } public int IntValue { get; set; } public double DoubleValue { get; set; } } }
Lambdaを実行するロールには、firehose:PutRecordの許可権限を付与します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "firehose:PutRecord" ], "Resource": "arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/delivery-stream" } ] }
Lambdaに適当な引数を与えて実行すると、指定したバケットのyyyy/mm/dd/hhフォルダ配下にストリーム名-バージョン-タイムスタンプ-GUID.gz
のファイルが作成されます。
{"StringValue":"text1","IntValue":10,"DoubleValue":2.5}
また、インターバルを1分で指定しており、例えば数秒間で2レコードを受信した場合は、以下のように1ファイルへマージされてS3へ出力されます。
{"StringValue":"text2","IntValue":10,"DoubleValue":2.5}{"StringValue":"text3","IntValue":10,"DoubleValue":2.5}