Amazon EMRのステップを使ってPySparkバッチアプリケーションを実装する

前回に引き続きEMRについてです。今回はEMRのステップを用いてPySparkのバッチアプリケーションを実装していきます。

ohke.hateblo.jp

EMRのステップ

前回は、JupyterノートブックからSpark環境へジョブをリクエストしていました。これは分析やモデリングの過程のアドホックなユースケースを想定していました。 これをバッチアプリケーションとしてプロダクション環境へそのまま持っていくと、いくつか不都合が発生します。

  • 前回の構成は共用を想定したクラスタのため、巨大なジョブの場合はリソース不足に陥りやすい
    • 他のジョブで忙しくしていると完了も遅れる
  • 専用のクラスタを立てる構成でも、起動させ続けてしまうと、ほとんどの時間でジョブは実行されないためコストパフォーマンスが悪い
    • 利用都度、クラスタを作成・終了をすることもできるが、ジョブのステータスを監視するのは面倒
  • Livyでジョブの実行をリクエストしていたが、インタラクティブに操作するわけではないので余分

EMRではこういったユースケースに対応する機能として、ステップが提供されています。ステップを使うと クラスタの構築 -> ジョブの実行 (1つ以上) -> クラスタの削除 という一連のフローをシンプルに実現できます。

準備

AWS CLIを使ってステップを実行する前に、いくつか準備必要です。

EMR操作権限の付与

AWS CLIを実行するロールに、EMRの操作権限が付与されている必要があります。
予めAmazonElasticMapReduceFullAccessポリシーを付与しておきます。

設定ファイルの作成

前回同様Python 3.6環境を使うようにSparkを設定する必要があるため、config.jsonファイルとしてローカルに作成しておきます。

[{
    "Classification": "spark-env",
    "Configurations": [{
        "Classification": "export",
        "Properties": {
            "PYSPARK_PYTHON": "/usr/bin/python3"
        }
    }]
}]

スクリプトのアップロード

前回作成したMovieLens 25mのPySparkレコメンドのコードをmovielens.pyとして作成し、S3にアップロード (ここでは s3://emr-temporary/steps/movielens.py ) しておきます。

ほぼ前回と同じですが、SparkSessionの取得を明示的に記述している点 (★) のみ修正しています (Sparkmagicでは裏でこれをやってくれていました) 。

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS

# SparkSessionの取得 (★)
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# スキーマを定義
schema = StructType([
    StructField("user", LongType(), True),
    StructField("item", LongType(), True),
    StructField("rating", FloatType(), True)
])

# sparkのDataFrameを作成
ratings = spark.read.csv("s3://emr-temporary/input/ratings.csv", schema=schema, header=False, sep=",")
# 欠損値を含む行を削除
ratings = ratings.dropna("any")

# モデルの作成
als = ALS(
    rank=20, maxIter=10, regParam=0.1, userCol="user", itemCol="item", ratingCol="rating", seed=0
)

# 学習
model = als.fit(ratings)

# 全ユーザのトップ10を予測
predicts = model.recommendForAllUsers(
    numItems=10
).select(
    "user", "recommendations.item", "recommendations.rating"
)

# 結果の出力
predicts.write.json("s3://emr-temporary/output/results")

ステップの実行

ではAWS CLIからSparkジョブをステップ実行します。

以下のようにemr create-clusterサブコマンドを使います。--stepsオプションにステップ実行の定義を記述します。

  • --stepsのJarにはcommand-runner.jar、引数 (Args) にアップロードしたスクリプトをそれぞれ指定
    • 実際に実行されるコマンドは hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit s3://emr-temporary/steps/movielens.py となります
  • --configurationsにローカルのconfig.jsonのファイルパスを指定
  • -- auto-terminateとすることで、実行後にクラスタが削除されます
$ aws emr create-cluster --name "movielens-cluter" --release-label emr-5.29.0 \
    --applications Name=Spark --instance-type m5.xlarge --instance-count 3 \
    --auto-terminate --ec2-attributes KeyName=mykey --use-default-roles 
    --configurations file://./config.json \
    --log-uri s3://aws-logs-000000000000-ap-northeast-1/elasticmapreduce/ \
    --steps Type=Spark,Name="movielens",Jar="command-runner.jar",Args=[s3://emr-temporary/steps/movielens.py],ActionOnFailure=CONTINUE
{
    "ClusterId": "j-K7TZ7758JFFT",
    "ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:000000000000:cluster/j-2OLL9SP1TA1KW"
}

ちなみに複数のステップを連ねる場合、以下のように記述します。

-- steps \
   Type=Spark,Name="step1",Jar="command-runner.jar",Args=[s3://some/step1.py],ActionOnFailure=CONTINUE \
   Type=Spark,Name="step2",Jar="command-runner.jar",Args=[s3://some/step2.py],ActionOnFailure=CONTINUE \
   ...

このコマンドでクラスタの構築 -> ステップの実行 -> クラスタの削除まで行われます。

f:id:ohke:20200420133814p:plain

実行に失敗した場合は以下のようになります。--log-uriオプションで出力先を指定していると、ログファイルも確認できるようになります。

f:id:ohke:20200420134046p:plain

まとめ

EMRのステップを用いたバッチアプリケーションの実装方法について整理しました。これにCloudwatchやLambdaなどと組み合わせることで、様々なイベントをトリガとして分散処理を走らせるといったことが可能になりますね。

CIに組み込んで利用する場合、適宜検討しないといけないこともありますので、そのあたりは要件を勘案しながら設計する必要があります。

  • スクリプトファイルのバージョン指定をどうやって行うべきか (1つのファイルを参照してデプロイ時に上書きする、複数バージョンのファイルをアップロードしておいてデプロイ時にコマンドライン引数を切り替えるか、など)
  • パラメータ (実行時引数) はどうやって渡すべきか (コマンドライン引数で渡す、入力ファイルに含めて渡す、など)