Jupyter Notebookなどで作業中に、メモリに乗り切らないような大量のデータに対して何らか処理が必要になるケースがしばしばあります。
これを解決するツールの一つとしてAmazon EMRをきちんと知っておきたいというモチベーションで、今回はAmazon EMRでSparkクラスタを構築して、SageMakerノートブックからこのSparkクラスタへジョブを実行させるというのをやってみます。
その前に: Amazon EMRとは何か
Amazon EMR (Elastic MapReduce) は、Apache HadoopやApache Sparkといった分散処理フレームワークとそれらに関連するアプリケーション (Apache HiveやApache Zookeeperなど) をバンドルしたクラスタの構築・管理およびインタフェースの提供などを一括して行うマネージドサービスです。
EMRを用いることで、例えば "メモリ16GB搭載のスレーブ100ノードからなるSparkクラスタの構築" といったリクエストも、コマンド1つで実現できます。
JupyterノートブックとSparkクラスタの接続は、Apache LivyのAPIサーバを介して、Sparkmagicで行います。
Sparkクラスタの構築
まずはSparkクラスタをEMRのコンソール画面から構築していきます。クラスターの作成 > 詳細オプションで行います。
- クイックオプションでは細かい設定ができないため
ステップ1.ではインストールするEMRのバージョン (5.29.0) と、フレームワーク (SparkとLivy) を選択します。
"ソフトウェア設定の編集" ではPython 3環境 (emr-5.29.0では3.6) が使われるようにJSONで設定を追加している点に注意です。デフォルトでは2.7となります。 (参考)
[{ "Classification": "spark-env", "Configurations": [{ "Classification": "export", "Properties": { "PYSPARK_PYTHON": "/usr/bin/python3" } }] }]
ステップ2.では、インスタンス構成を選択します。ここではマスタ 1ノードとコア (スレーブ) 2ノードをm5.xlargeで構成しています。
ステップ3.で、クラスタ名 ("alice-cluster") とログの出力先を設定します。追加タグやブートストラップアクションなどもここで設定します。
- ブートストラップアクションによってクラスタ起動時に実行されるスクリプトを設定することができ、追加ソフトウェアのインストールや環境変数のセットなどを任意に定義できます
最後のステップ4.にてSSHキーとセキュリティグループの設定などを行います。後ほど変更しますが、まずデフォルトのまま構築します。
"クラスターを作成" をクリックすると構築が開始され、完了すると "待機中" のステータスになります。クラスタのIDとしては "j-36SCY8Z3S92HW" が採番されています。
セキュリティグループの設定
SageMakerからSpark (マスタノード) へのアクセスはLivyのAPIサーバ (ポート番号8998) を介して行われます。この通信を許可するようにセキュリティグループを設定します。
予めSageMakerノートブックインスタンスに割り当てるセキュリティグループ ("sagemaker-sg") を作成しておきます。次にマスタノードのセキュリティグループにて、sagemaker-sgからポート番号8998へのインバウンドを許可します。
- ついでにSSH接続も許可しておきます
以上でクラスタ側の設定は完了です。
SageMakerからSparkクラスタで計算する
ノートブックの作成
次にSageMaker側の設定です。上で作ったセキュリティグループ (sagemaker-sg) を割り当てておきます。
立ち上げたノートブックからSageMakerにアクセスします。
まずはTerminalを開いて、SSHでマスタノードに接続し、pysparkが立ち上げられることを確認します。
- コンソール画面で "マスタパブリックDNS" のホスト名を確認し、設定したキーファイルを使ってつなぎます
- Python 3.6が使われていることも要チェックです
sh-4.2$ ssh -i ~/.ssh/key.pem hadoop@ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com Last login: Sun Apr 19 07:19:03 2020 __| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/ 20 package(s) needed for security, out of 36 available Run "sudo yum update" to apply all updates. EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR [hadoop@ip-172-31-17-182 ~]$ pyspark Python 3.6.8 (default, Oct 14 2019, 21:22:53) [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 20/04/19 07:37:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Python version 3.6.8 (default, Oct 14 2019 21:22:53) SparkSession available as 'spark'. >>>
次にcurlコマンドで以下のようにLivyと通信できることを確認します。JSONが返されていますのでOKですね。
sh-4.2$ curl ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998/sessions {"from":0,"total":0,"sessions":[]
Sparkmagicを使ってSparkクラスタで計算する
SageMakerにはPySpark向けのSparkmagicカーネルがプリインストールされているので、必要な準備は設定ファイルの作成だけです。
Sparkmagicの設定ファイルをダウンロードし、~/.sparkmagic/config.json
でリネームします。このconfig.jsonの接続先をlocalhostからマスタノードのホスト名に書き換えます。以下のような感じになるかと思います。
{ "kernel_python_credentials" : { "username": "", "password": "", "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998", "auth": "None" }, "kernel_scala_credentials" : { "username": "", "password": "", "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998", "auth": "None" }, "kernel_r_credentials": { "username": "", "password": "", "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998" }, ... }
"Sparkmagic (PySpark)" でノートブックを立ち上げ、%%info
マジックコマンドでセッション情報が取得できることを確認して準備完了です。
実際にSparkクラスタで計算させます。今回は、MLパッケージを使って実装したMovieLens 100kのレコメンドコードをベースに、MovieLens 25Mへ拡張させます。
最初に入力ファイルの準備です。MovieLens 25Mデータセットをダウンロード・解凍し、ratings.csvを適当なS3バケットにアップロードします。
Sparkmagicはデフォルトでは全てSparkクラスタに送られて実行されます。ローカル (= ノートブック) にて実行する場合、%%local
マジックコマンドを使います。
- EMRのインスタンスに付与されるデフォルトロール (EMR_EC2_DefaultRole) には全バケットへのフルアクセス権限が付与されています
%%local import pandas as pd import boto3 import urllib.request import zipfile # MovieLens 25Mをダウンロードして解凍 urllib.request.urlretrieve("http://files.grouplens.org/datasets/movielens/ml-25m.zip", "ml-25m.zip") with zipfile.ZipFile("./ml-25m.zip") as zip_file: zip_file.extractall(".") # S3にアップロード s3 = boto3.resource("s3") bucket = s3.Bucket("emr-temporary") bucket.upload_file("./ml-25m/ratings.csv", "input/ratings.csv")
次にSparkクラスタで実行されるコードの実装です。S3を入出力先として用いています。
概ね以前の実装と同じですが、Sparkmagicのセル内では変数spark
で接続済みのSparkSessionオブジェクトへアクセスできる点に注意です。
from pyspark.sql.types import * from pyspark.ml.recommendation import ALS # スキーマを定義 schema = StructType([ StructField("user", LongType(), True), StructField("item", LongType(), True), StructField("rating", FloatType(), True) ]) # sparkのDataFrameを作成 ratings = spark.read.csv("s3://emr-temporary/input/ratings.csv", schema=schema, header=False, sep=",") # 欠損値を含む行を削除 ratings = ratings.dropna("any") # モデルの作成 als = ALS( rank=20, maxIter=10, regParam=0.1, userCol="user", itemCol="item", ratingCol="rating", seed=0 ) # 学習 model = als.fit(ratings) # 全ユーザのトップ10を予測 predicts = model.recommendForAllUsers( numItems=10 ).select( "user", "recommendations.item", "recommendations.rating" ) # 結果の出力 predicts.write.json("s3://emr-temporary/output/results")
実行後、しばらくするとS3に結果が出力されます。
またAWSのEMRのコンソールからSparkのWeb UIへジャンプすることができます。これを見ると、トータル6分くらいで処理が正常完了していることも確認できます。
まとめ
Amazon EMRを使ってSparkクラスタを構築し、SageMakerノートブックからジョブを実行させる方法について整理しました。