Kinesis FirehoseでS3にアップロードしたファイルをAthenaで検索する

前回の投稿ではLambdaからエンキューされたメッセージをKinesis FirehoseでS3までアップロードしました。
今回はAthenaを使ってこのアップロードしたファイルをSQLで検索できるようにします。

Athena

S3バケットのファイルからSQLライクな構文で検索できるサーバレスなサービスで、スキャンされたデータ量に対してのみ課金されます。 現時点(2017/4/7)では米国リージョンのみの提供となっています。

Amazon Athena (サーバーレスのインタラクティブなクエリサービス) | AWS

今回は前回作成したバケットに対してAthenaで検索できるようにします。
また、あわせてパーティションも設定・作成します。 デフォルトだとバケット内の全ファイルをスキャンしますが、パーティションを使うことでスキャンするディレクトリを限定することができます。

DBとテーブルの作成

まずはAthenaでDBとテーブルを作成します。

Category Manager→Add tableでtestdbとtest_tableを作成します。

  • 前回作成したs3://delivery-s3/を選択します

f:id:ohke:20170406213621p:plain

データ形式JSONを指定します。

f:id:ohke:20170406213804p:plain

名前と型を指定して、3つのカラムを作成します。この時、JSONのキー名と同じカラム名にすることでマッピングします。

f:id:ohke:20170406213850p:plain

最後にパーティションの設定をして、完了です。
Kinesis Firehoseではバケット以下にyyyy/mm/dd/hhのフォルダを作成しますが、今回は日単位でパーティションを作成するため、year、month、dayをパーティションにします。

f:id:ohke:20170406214037p:plain

すると、SQLが実行され、テーブルが作成されます。

CREATE EXTERNAL TABLE IF NOT EXISTS testdb.test_table (
  `StringValue` string,
  `IntValue` int,
  `DoubleValue` double 
) PARTITIONED BY (
  year string,
  month string,
  day string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://delivery-s3/'
TBLPROPERTIES ('has_encrypted_data'='false')

パーティションの作成

これでもうSQLを実行できるようになりますが、パーティションを作成しておきます。

2017/03/30でパーティションを作る場合は、以下のようなSQLになります。 同様に2017/04/06も作成しておきます。

ALTER TABLE testdb.test_table ADD PARTITION (year='2017',month='03',day='30') location 's3://delivery-s3/2017/03/30/'

作成したパーティションは、show partitionsで一覧を見れます。

show partitions testdb.test_table;
year=2017/month=03/day=30
year=2017/month=04/day=06

SQLの実行

試しに全件を取得してみます。

  • パーティションで設定したカラム(year、month、day)も表示されていることを確認してください
select * from testdb.test_table;

f:id:ohke:20170406220547p:plain

パーティションを指定して検索する場合、以下のようにwhereでyear、month、dayを指定します。

select * from testdb.test_table where year='2017' and month = '03' and day = '30' and stringvalue = 'text2';

スキャンされたデータ量を見ると、0.19KBから0.13KBへ減っています。 パーティションによってスキャンするファイルが2017/03/30以下のみになったためです。

f:id:ohke:20170406220440p:plain

Kinesis Firehoseを使ってLambda(C#)からS3にファイル出力する

今回はKinesis Firehoseを使って、Lambdaから受け取ったレコードをS3に出力させてみます。

Kinesis Firehoseの作成

Kinesis Firehoseは受信したレコードをプログラミングゼロでS3/Redshift/Elasticsearchへ流し込むことができるストリームサービスで、現在は米国リージョンでのみの提供されています。

Amazon Kinesis Firehose(ストリーミングデータを AWS へ簡単にロード) | AWS

今回は、S3(delivery-s3バケット)に出力するKinesis Firehose(delivery-stream)を作成します。 - 60秒間隔(または5KB単位)でS3へ出力され、それまではFirehose内でバッファリングされます - GZIP形式で出力します

f:id:ohke:20170331085712j:plain:w400

Lambdaの実装

次に作成したFirehoseへレコードをPUTするLambdaを実装します。引数をJSON文字列にして、そのままFirehoseへPUTしています。 - Amazon.KinesisFirehoseを使うため、project.jsonではdependenciesにAWSSDK.KinesisFirehoseを追加してください

using System.IO;
using System.Net;
using System.Text;
using Amazon;
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

namespace AWSLambda
{
    public class KinesisFirehosePutFunction
    {
        private AmazonKinesisFirehoseClient _firehoseClient = new AmazonKinesisFirehoseClient(RegionEndpoint.USWest2);
        private string _firehoseName = "delivery-stream";

        [LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
        public async System.Threading.Tasks.Task<string> KinesisFirehosePutHandler(KinesisFirehoseHandlerInput input, ILambdaContext context)
        {
            var data = JsonConvert.SerializeObject(input);

            // Amazon.KinesisFirehose.Model.RecordインスタンスのDataプロパティにPUTするデータを詰める
            var record = new Record()
            {
                Data = new MemoryStream(Encoding.UTF8.GetBytes(data))
            };

            // Kinesis Firehose(ストリーム名:delivery-stream)へ作成したRecordインスタンスをPUTする
            var response = await _firehoseClient.PutRecordAsync(_firehoseName, record);

            if (response.HttpStatusCode == HttpStatusCode.OK)
            {
                return $"RecordId: {response.RecordId}";
            }
            else
            {
                return "Error";
            }
        }
    }

    public class KinesisFirehoseHandlerInput
    {
        public string StringValue { get; set; }
        public int IntValue { get; set; }
        public double DoubleValue { get; set; }
    }
}

Lambdaを実行するロールには、firehose:PutRecordの許可権限を付与します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "firehose:PutRecord"
      ],
      "Resource": "arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/delivery-stream"
    }
  ]
}

Lambdaに適当な引数を与えて実行すると、指定したバケットのyyyy/mm/dd/hhフォルダ配下にストリーム名-バージョン-タイムスタンプ-GUID.gzのファイルが作成されます。

f:id:ohke:20170331090148j:plain

{"StringValue":"text1","IntValue":10,"DoubleValue":2.5}

また、インターバルを1分で指定しており、例えば数秒間で2レコードを受信した場合は、以下のように1ファイルへマージされてS3へ出力されます。

{"StringValue":"text2","IntValue":10,"DoubleValue":2.5}{"StringValue":"text3","IntValue":10,"DoubleValue":2.5}

C# Regexクラスのインスタンスメソッドと静的メソッドの性能比較

大量のURL文字列を正規表現パターンとマッチングして分類するバッチ処理C#で開発していたところ、実装が進んで分類を増やすと、唐突に割に合わない処理時間となってしまったことがありました。 原因としては正規表現のマッチング方法が非効率的だったためでした。

今回はRegexクラスの静的メソッドを使ったマッチングとインスタンスメソッドを使ったマッチングで性能比較してみます。

C#での正規表現

C#正規表現ではRegexクラス(System.Text.RegularExpressions)が一般的に使われますが、マッチングにおいてはいくつかのアプローチが提供されています。

特にインスタンス化の方法はパフォーマンスに大きな影響を与えることがあり、MSDNでも言及されています。 https://msdn.microsoft.com/ja-jp/library/gg578045(v=vs.110).aspx

マッチング方法は主に2つあります。

Regex静的メソッドを使う

例えば、文字列inputで4桁の数字をマッチングする場合はこんな感じです。

using System.Text.RegularExpressions;

Regex.Match(input, "[0-9]{4}");

メソッドの呼び出しによって正規表現パターン("[0-9]{4}“)がオペレーションコードへ変換されるのですが、この変換結果はキャッシュされるようになっています。 キャッシュされるパターン数はRegex.CacheSizeプロパティで指定できます(デフォルトで15)。

冒頭の事象は、分類が増えたことによってキャッシュする変換済みの正規表現パターンが増え、キャッシュミスが頻発したことによってパフォーマンスが劣化したためでした。

Regexインスタンスメソッドを使う

Regexクラスを正規表現パターンを引数としてインスタンス化する方法もあります。

using System.Text.RegularExpressions;

var regex = new Regex("[0-9]{4}");
regex.Match(input);

インスタンス化することで正規表現パターンから変換されたオペレーションコードをキャッシュにかかわらず持たせることができます。 冒頭の事象では、分類が限られていたということもあり、インスタンス化することで解決しました。

さらにオペレーションコードからMSILまでコンパイルするオプション(RegexOptions.Compiled)も用意されており、繰り返しマッチングする場合はコンパイルしたほうが高速とのことです。

var regex = new Regex("[0-9]{4}", RegexOptions.Compiled);

パフォーマンス比較

手軽なのは静的メソッドを使う方法ですが、何度も同じ正規表現文字列を使う場合にはインスタンスメソッドを使うことがパフォーマンスの面で推奨されています。
1,000,000件のGuid文字列から"[0-9]{4}“をマッチングするコードを4パターンで実装し、実行時間を比較してみました。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text.RegularExpressions;

namespace ConsoleApplication
{
    public class Program
    {
        public static void Main(string[] args)
        {
            long staticMethodTotal = 0;
            long staticMethodNoCacheTotal = 0;
            long instanceMethodTotal = 0;
            long instanceMethodCompiledTotal = 0;

            Console.WriteLine(Regex.CacheSize);

            // 10回平均を求める
            for (var try_num = 0; try_num < 10; try_num++) 
            {
                // 1,000,000個のGUID文字列からなるリストを生成
                var list = new List<string>();
                for (int i = 0; i < 1000000; i++)
                {
                    list.Add(new Guid().ToString());
                }

                var sw = new Stopwatch();

                // パターン1. Regex静的メソッドを使う(キャッシュ有り)
                Regex.CacheSize = 1;
                sw.Start();
                UseStaticMethod(list);
                sw.Stop();
                staticMethodTotal += sw.ElapsedMilliseconds;

                // パターン2. Regex静的メソッドを使う(キャッシュ無し)
                Regex.CacheSize = 0; // キャッシュ数を0にして無効化
                sw.Reset();
                sw.Start();
                UseStaticMethod(list);
                sw.Stop();
                staticMethodNoCacheTotal += sw.ElapsedMilliseconds;

                // パターン3. Regexインスタンスメソッドを使う(コンパイルオプション無効)
                sw.Reset();
                sw.Start();
                UseInstanceMethod(list);
                sw.Stop();
                instanceMethodTotal += sw.ElapsedMilliseconds;

                // パターン4. Regexインスタンスメソッドを使う(コンパイルオプション有効)
                sw.Reset();
                sw.Start();
                UseInstanceMethod(list, true);
                sw.Stop();
                instanceMethodCompiledTotal += sw.ElapsedMilliseconds;
            }

            Console.WriteLine(staticMethodTotal / 10.0);
            Console.WriteLine(staticMethodNoCacheTotal / 10.0);
            Console.WriteLine(instanceMethodTotal / 10.0);
            Console.WriteLine(instanceMethodCompiledTotal / 10.0);
        }

        private static List<bool> UseStaticMethod(List<string> list)
        {
            var result = new List<bool>();

            foreach (var guid in list)
            {
                result.Add(Regex.IsMatch(guid, "[0-9]{4}"));
            }

            return result;
        }

        private static List<bool> UseInstanceMethod(List<string> list, bool compiled = false)
        {
            var regex = compiled ? 
                        new Regex("[0-9]{4}", RegexOptions.Compiled) : 
                        new Regex("[0-9]{4}");

            var result = new List<bool>();

            foreach (var guid in list)
            {
                result.Add(regex.IsMatch(guid));
            }

            return result;
        }
    }
}

結果は結果は以下の通りでした。

  • キャッシュの有無(パターン1.と2.の比較)は強力で、約7倍の性能差となって現れました
    • 暗黙的にキャッシュされるので、少し正規表現パターンを増やしただけなのに性能がガクッと落ちた際はキャッシュミスを疑ってみるのはありです
  • インスタンスメソッドの方が高速(パターン1.と3.の比較)になりましたが、感覚的にはそこまで大きな差がない印象です
    • 純粋にインスタンスの生成コストの差のようで、キャッシュさえ効いていればどっちでも良いですね
  • コンパイル有無(パターン3.と4.の比較)については定説と逆転してしまいました(謎です。。。)
実行時間[ms]
パターン1. Regex静的メソッドを使う(キャッシュ有り) 472
パターン2. Regex静的メソッドを使う(キャッシュ無し) 3329
パターン3. Regexインスタンスメソッドを使う(コンパイルオプション無効) 258
パターン4. Regexインスタンスメソッドを使う(コンパイルオプション有効) 266

今回は単純な正規表現パターンで比較しましたが、バックトラッキングなどを使ってもっと複雑になってくると、より顕著に性能差が広がると思われます。