Amazon EMRでSparkクラスタを作ってSageMakerからジョブを実行する

Jupyter Notebookなどで作業中に、メモリに乗り切らないような大量のデータに対して何らか処理が必要になるケースがしばしばあります。

これを解決するツールの一つとしてAmazon EMRをきちんと知っておきたいというモチベーションで、今回はAmazon EMRでSparkクラスタを構築して、SageMakerノートブックからこのSparkクラスタへジョブを実行させるというのをやってみます。

その前に: Amazon EMRとは何か

Amazon EMR (Elastic MapReduce) は、Apache HadoopやApache Sparkといった分散処理フレームワークとそれらに関連するアプリケーション (Apache HiveやApache Zookeeperなど) をバンドルしたクラスタの構築・管理およびインタフェースの提供などを一括して行うマネージドサービスです。

EMRを用いることで、例えば "メモリ16GB搭載のスレーブ100ノードからなるSparkクラスタの構築" といったリクエストも、コマンド1つで実現できます。

aws.amazon.com

JupyterノートブックとSparkクラスタの接続は、Apache LivyのAPIサーバを介して、Sparkmagicで行います。

livy.apache.org

github.com

Sparkクラスタの構築

まずはSparkクラスタをEMRのコンソール画面から構築していきます。クラスターの作成 > 詳細オプションで行います。

  • クイックオプションでは細かい設定ができないため

ステップ1.ではインストールするEMRのバージョン (5.29.0) と、フレームワーク (SparkとLivy) を選択します。
"ソフトウェア設定の編集" ではPython 3環境 (emr-5.29.0では3.6) が使われるようにJSONで設定を追加している点に注意です。デフォルトでは2.7となります。 (参考)

f:id:ohke:20200419162100p:plain

[{
  "Classification": "spark-env",
  "Configurations": [{
    "Classification": "export",
    "Properties": {
      "PYSPARK_PYTHON": "/usr/bin/python3"
    }
  }]
}]

ステップ2.では、インスタンス構成を選択します。ここではマスタ 1ノードとコア (スレーブ) 2ノードをm5.xlargeで構成しています。

f:id:ohke:20200419155251p:plain

ステップ3.で、クラスタ名 ("alice-cluster") とログの出力先を設定します。追加タグやブートストラップアクションなどもここで設定します。

  • ブートストラップアクションによってクラスタ起動時に実行されるスクリプトを設定することができ、追加ソフトウェアのインストールや環境変数のセットなどを任意に定義できます

f:id:ohke:20200419155501p:plain

最後のステップ4.にてSSHキーとセキュリティグループの設定などを行います。後ほど変更しますが、まずデフォルトのまま構築します。

f:id:ohke:20200419160009p:plain

"クラスターを作成" をクリックすると構築が開始され、完了すると "待機中" のステータスになります。クラスタのIDとしては "j-36SCY8Z3S92HW" が採番されています。

f:id:ohke:20200419162308p:plain

セキュリティグループの設定

SageMakerからSpark (マスタノード) へのアクセスはLivyのAPIサーバ (ポート番号8998) を介して行われます。この通信を許可するようにセキュリティグループを設定します。

予めSageMakerノートブックインスタンスに割り当てるセキュリティグループ ("sagemaker-sg") を作成しておきます。次にマスタノードのセキュリティグループにて、sagemaker-sgからポート番号8998へのインバウンドを許可します。

  • ついでにSSH接続も許可しておきます

f:id:ohke:20200419161610p:plain

以上でクラスタ側の設定は完了です。

SageMakerからSparkクラスタで計算する

ノートブックの作成

次にSageMaker側の設定です。上で作ったセキュリティグループ (sagemaker-sg) を割り当てておきます。

f:id:ohke:20200419163113p:plain

立ち上げたノートブックからSageMakerにアクセスします。

まずはTerminalを開いて、SSHでマスタノードに接続し、pysparkが立ち上げられることを確認します。

  • コンソール画面で "マスタパブリックDNS" のホスト名を確認し、設定したキーファイルを使ってつなぎます
  • Python 3.6が使われていることも要チェックです
sh-4.2$ ssh -i ~/.ssh/key.pem hadoop@ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com
Last login: Sun Apr 19 07:19:03 2020

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
20 package(s) needed for security, out of 36 available
Run "sudo yum update" to apply all updates.
                                                                    
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R   
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R 
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R 
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR   
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR
                                                                    
[hadoop@ip-172-31-17-182 ~]$ pyspark
Python 3.6.8 (default, Oct 14 2019, 21:22:53) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/04/19 07:37:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 3.6.8 (default, Oct 14 2019 21:22:53)
SparkSession available as 'spark'.
>>>

次にcurlコマンドで以下のようにLivyと通信できることを確認します。JSONが返されていますのでOKですね。

sh-4.2$  curl ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998/sessions
{"from":0,"total":0,"sessions":[]

Sparkmagicを使ってSparkクラスタで計算する

SageMakerにはPySpark向けのSparkmagicカーネルがプリインストールされているので、必要な準備は設定ファイルの作成だけです。

Sparkmagicの設定ファイルをダウンロードし、~/.sparkmagic/config.jsonでリネームします。このconfig.jsonの接続先をlocalhostからマスタノードのホスト名に書き換えます。以下のような感じになるかと思います。

{
  "kernel_python_credentials" : {
    "username": "",
    "password": "",
    "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998",
    "auth": "None"
  },

  "kernel_scala_credentials" : {
    "username": "",
    "password": "",
    "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998",
    "auth": "None"
  },
  "kernel_r_credentials": {
    "username": "",
    "password": "",
    "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998"
  },
  ...
}

"Sparkmagic (PySpark)" でノートブックを立ち上げ、%%infoマジックコマンドでセッション情報が取得できることを確認して準備完了です。

f:id:ohke:20200419164705p:plain

f:id:ohke:20200419201706p:plain

実際にSparkクラスタで計算させます。今回は、MLパッケージを使って実装したMovieLens 100kのレコメンドコードをベースに、MovieLens 25Mへ拡張させます。

grouplens.org

最初に入力ファイルの準備です。MovieLens 25Mデータセットをダウンロード・解凍し、ratings.csvを適当なS3バケットにアップロードします。
Sparkmagicはデフォルトでは全てSparkクラスタに送られて実行されます。ローカル (= ノートブック) にて実行する場合、%%localマジックコマンドを使います。

  • EMRのインスタンスに付与されるデフォルトロール (EMR_EC2_DefaultRole) には全バケットへのフルアクセス権限が付与されています
%%local
import pandas as pd
import boto3
import urllib.request
import zipfile

# MovieLens 25Mをダウンロードして解凍
urllib.request.urlretrieve("http://files.grouplens.org/datasets/movielens/ml-25m.zip", "ml-25m.zip")
with zipfile.ZipFile("./ml-25m.zip") as zip_file:
    zip_file.extractall(".")

# S3にアップロード
s3 = boto3.resource("s3")
bucket = s3.Bucket("emr-temporary")
bucket.upload_file("./ml-25m/ratings.csv", "input/ratings.csv")

次にSparkクラスタで実行されるコードの実装です。S3を入出力先として用いています。
概ね以前の実装と同じですが、Sparkmagicのセル内では変数sparkで接続済みのSparkSessionオブジェクトへアクセスできる点に注意です。

from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS

# スキーマを定義
schema = StructType([
    StructField("user", LongType(), True),
    StructField("item", LongType(), True),
    StructField("rating", FloatType(), True)
])

# sparkのDataFrameを作成
ratings = spark.read.csv("s3://emr-temporary/input/ratings.csv", schema=schema, header=False, sep=",")
# 欠損値を含む行を削除
ratings = ratings.dropna("any")

# モデルの作成
als = ALS(
    rank=20, 
    maxIter=10,
    regParam=0.1,
    userCol="user",
    itemCol="item",
    ratingCol="rating",
    seed=0
)

# 学習
model = als.fit(ratings)

# 全ユーザのトップ10を予測
predicts = model.recommendForAllUsers(
    numItems=10
).select(
    "user",
    "recommendations.item", 
    "recommendations.rating"
)

# 結果の出力
predicts.write.json("s3://emr-temporary/output/results")

実行後、しばらくするとS3に結果が出力されます。

またAWSのEMRのコンソールからSparkのWeb UIへジャンプすることができます。これを見ると、トータル6分くらいで処理が正常完了していることも確認できます。

f:id:ohke:20200419205153p:plain

まとめ

Amazon EMRを使ってSparkクラスタを構築し、SageMakerノートブックからジョブを実行させる方法について整理しました。