Amazon EMRでSparkクラスタを作ってSageMakerからジョブを実行する

Jupyter Notebookなどで作業中に、メモリに乗り切らないような大量のデータに対して何らか処理が必要になるケースがしばしばあります。

これを解決するツールの一つとしてAmazon EMRをきちんと知っておきたいというモチベーションで、今回はAmazon EMRでSparkクラスタを構築して、SageMakerノートブックからこのSparkクラスタへジョブを実行させるというのをやってみます。

その前に: Amazon EMRとは何か

Amazon EMR (Elastic MapReduce) は、Apache HadoopやApache Sparkといった分散処理フレームワークとそれらに関連するアプリケーション (Apache HiveやApache Zookeeperなど) をバンドルしたクラスタの構築・管理およびインタフェースの提供などを一括して行うマネージドサービスです。

EMRを用いることで、例えば "メモリ16GB搭載のスレーブ100ノードからなるSparkクラスタの構築" といったリクエストも、コマンド1つで実現できます。

aws.amazon.com

JupyterノートブックとSparkクラスタの接続は、Apache LivyのAPIサーバを介して、Sparkmagicで行います。

livy.apache.org

github.com

Sparkクラスタの構築

まずはSparkクラスタをEMRのコンソール画面から構築していきます。クラスターの作成 > 詳細オプションで行います。

  • クイックオプションでは細かい設定ができないため

ステップ1.ではインストールするEMRのバージョン (5.29.0) と、フレームワーク (SparkとLivy) を選択します。
"ソフトウェア設定の編集" ではPython 3環境 (emr-5.29.0では3.6) が使われるようにJSONで設定を追加している点に注意です。デフォルトでは2.7となります。 (参考)

f:id:ohke:20200419162100p:plain

[{
  "Classification": "spark-env",
  "Configurations": [{
    "Classification": "export",
    "Properties": {
      "PYSPARK_PYTHON": "/usr/bin/python3"
    }
  }]
}]

ステップ2.では、インスタンス構成を選択します。ここではマスタ 1ノードとコア (スレーブ) 2ノードをm5.xlargeで構成しています。

f:id:ohke:20200419155251p:plain

ステップ3.で、クラスタ名 ("alice-cluster") とログの出力先を設定します。追加タグやブートストラップアクションなどもここで設定します。

  • ブートストラップアクションによってクラスタ起動時に実行されるスクリプトを設定することができ、追加ソフトウェアのインストールや環境変数のセットなどを任意に定義できます

f:id:ohke:20200419155501p:plain

最後のステップ4.にてSSHキーとセキュリティグループの設定などを行います。後ほど変更しますが、まずデフォルトのまま構築します。

f:id:ohke:20200419160009p:plain

"クラスターを作成" をクリックすると構築が開始され、完了すると "待機中" のステータスになります。クラスタのIDとしては "j-36SCY8Z3S92HW" が採番されています。

f:id:ohke:20200419162308p:plain

セキュリティグループの設定

SageMakerからSpark (マスタノード) へのアクセスはLivyのAPIサーバ (ポート番号8998) を介して行われます。この通信を許可するようにセキュリティグループを設定します。

予めSageMakerノートブックインスタンスに割り当てるセキュリティグループ ("sagemaker-sg") を作成しておきます。次にマスタノードのセキュリティグループにて、sagemaker-sgからポート番号8998へのインバウンドを許可します。

  • ついでにSSH接続も許可しておきます

f:id:ohke:20200419161610p:plain

以上でクラスタ側の設定は完了です。

SageMakerからSparkクラスタで計算する

ノートブックの作成

次にSageMaker側の設定です。上で作ったセキュリティグループ (sagemaker-sg) を割り当てておきます。

f:id:ohke:20200419163113p:plain

立ち上げたノートブックからSageMakerにアクセスします。

まずはTerminalを開いて、SSHでマスタノードに接続し、pysparkが立ち上げられることを確認します。

  • コンソール画面で "マスタパブリックDNS" のホスト名を確認し、設定したキーファイルを使ってつなぎます
  • Python 3.6が使われていることも要チェックです
sh-4.2$ ssh -i ~/.ssh/key.pem hadoop@ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com
Last login: Sun Apr 19 07:19:03 2020

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
20 package(s) needed for security, out of 36 available
Run "sudo yum update" to apply all updates.
                                                                    
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R   
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R 
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R 
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR   
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR
                                                                    
[hadoop@ip-172-31-17-182 ~]$ pyspark
Python 3.6.8 (default, Oct 14 2019, 21:22:53) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/04/19 07:37:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 3.6.8 (default, Oct 14 2019 21:22:53)
SparkSession available as 'spark'.
>>>

次にcurlコマンドで以下のようにLivyと通信できることを確認します。JSONが返されていますのでOKですね。

sh-4.2$  curl ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998/sessions
{"from":0,"total":0,"sessions":[]

Sparkmagicを使ってSparkクラスタで計算する

SageMakerにはPySpark向けのSparkmagicカーネルがプリインストールされているので、必要な準備は設定ファイルの作成だけです。

Sparkmagicの設定ファイルをダウンロードし、~/.sparkmagic/config.jsonでリネームします。このconfig.jsonの接続先をlocalhostからマスタノードのホスト名に書き換えます。以下のような感じになるかと思います。

{
  "kernel_python_credentials" : {
    "username": "",
    "password": "",
    "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998",
    "auth": "None"
  },

  "kernel_scala_credentials" : {
    "username": "",
    "password": "",
    "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998",
    "auth": "None"
  },
  "kernel_r_credentials": {
    "username": "",
    "password": "",
    "url": "http://ec2-000-000-000-000.ap-northeast-1.compute.amazonaws.com:8998"
  },
  ...
}

"Sparkmagic (PySpark)" でノートブックを立ち上げ、%%infoマジックコマンドでセッション情報が取得できることを確認して準備完了です。

f:id:ohke:20200419164705p:plain

f:id:ohke:20200419201706p:plain

実際にSparkクラスタで計算させます。今回は、MLパッケージを使って実装したMovieLens 100kのレコメンドコードをベースに、MovieLens 25Mへ拡張させます。

grouplens.org

最初に入力ファイルの準備です。MovieLens 25Mデータセットをダウンロード・解凍し、ratings.csvを適当なS3バケットにアップロードします。
Sparkmagicはデフォルトでは全てSparkクラスタに送られて実行されます。ローカル (= ノートブック) にて実行する場合、%%localマジックコマンドを使います。

  • EMRのインスタンスに付与されるデフォルトロール (EMR_EC2_DefaultRole) には全バケットへのフルアクセス権限が付与されています
%%local
import pandas as pd
import boto3
import urllib.request
import zipfile

# MovieLens 25Mをダウンロードして解凍
urllib.request.urlretrieve("http://files.grouplens.org/datasets/movielens/ml-25m.zip", "ml-25m.zip")
with zipfile.ZipFile("./ml-25m.zip") as zip_file:
    zip_file.extractall(".")

# S3にアップロード
s3 = boto3.resource("s3")
bucket = s3.Bucket("emr-temporary")
bucket.upload_file("./ml-25m/ratings.csv", "input/ratings.csv")

次にSparkクラスタで実行されるコードの実装です。S3を入出力先として用いています。
概ね以前の実装と同じですが、Sparkmagicのセル内では変数sparkで接続済みのSparkSessionオブジェクトへアクセスできる点に注意です。

from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS

# スキーマを定義
schema = StructType([
    StructField("user", LongType(), True),
    StructField("item", LongType(), True),
    StructField("rating", FloatType(), True)
])

# sparkのDataFrameを作成
ratings = spark.read.csv("s3://emr-temporary/input/ratings.csv", schema=schema, header=False, sep=",")
# 欠損値を含む行を削除
ratings = ratings.dropna("any")

# モデルの作成
als = ALS(
    rank=20, 
    maxIter=10,
    regParam=0.1,
    userCol="user",
    itemCol="item",
    ratingCol="rating",
    seed=0
)

# 学習
model = als.fit(ratings)

# 全ユーザのトップ10を予測
predicts = model.recommendForAllUsers(
    numItems=10
).select(
    "user",
    "recommendations.item", 
    "recommendations.rating"
)

# 結果の出力
predicts.write.json("s3://emr-temporary/output/results")

実行後、しばらくするとS3に結果が出力されます。

またAWSのEMRのコンソールからSparkのWeb UIへジャンプすることができます。これを見ると、トータル6分くらいで処理が正常完了していることも確認できます。

f:id:ohke:20200419205153p:plain

まとめ

Amazon EMRを使ってSparkクラスタを構築し、SageMakerノートブックからジョブを実行させる方法について整理しました。

rust-ndarrayを使った行列演算

Rustの行列演算ライブラリ rust-ndarray についての紹介です。ほとんどが備忘録的なコード例となります。

rust-ndarray

Rustの行列演算ライブラリはいくつかあります。rust-ndarrayもその1つで、概ねプリミティブな行列演算のみをサポートしたものです。

github.com

ndarrayクレートを追加しておきます。

[dependencies]
ndarray = "0.13.0"

ベクトル

最初にベクトル演算です。Array1型でやりとりします。

  • 定数で初期化する場合、関数arr1で生成
  • ブラケット [i] で各要素にアクセス
  • sliceメソッドにマクロ s! の値を渡すことでスライスライクに複数要素へアクセスできる
use ndarray::prelude::*;

fn main() {
    // ベクトルの定義
    let vec1 = arr1(&[1, 2, 3]);
    println!("{}", vec1);  // [1, 2, 3]
    println!("{:?}", vec1.shape());  // [3]

    // 0番目の値を参照
    println!("{}", vec1[0]);  // 1

    // 1〜2番目の値にアクセス
    println!("{}", vec1.slice(s![1..3]));  // [2, 3]

    // ベクトル同士の和
    let vec2 = arr1(&[4, 5, 6]);
    let v = &vec1 + &vec2;
    println!("{}", v);  // [5, 7, 9]

    // ベクトル同士の積
    let v = &vec1 * &vec2;
    println!("{}", v);  // [4, 10, 18]
}

行列

次に行列です。行列の場合は、Array2型となります。

  • i行j列の値にはmat[[i, j]]でアクセス
  • *はアダマール積 (要素ごとの積)で、内積はdotメソッドで計算する
  • 転置はtメソッド
use ndarray::prelude::*;

fn main() {
    // 行列の定義
    let mat1 = arr2(&[
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ]);
    println!("{}", mat1);
    // [[1, 2, 3],
    //  [4, 5, 6],
    //  [7, 8, 9]]
    println!("{:?}", mat1.shape());  // [3, 3]

    // 1行目・2列目の値にアクセス
    println!("{}", mat1[[1, 2]]);  // 6

    // 行列のアダマール積
    let mat2 = arr2(&[
        [1, 1, 1],
        [1, 2, 4],
        [1, 3, 9]
    ]);

    let m = &mat1 * &mat2;
    println!("{}", m);
    // [[1, 2, 3],
    //  [4, 10, 24],
    //  [7, 24, 81]]

    // 転置
    let m = &mat1.t();
    println!("{}", m);
    // [[1, 4, 7],
    //  [2, 5, 8],
    //  [3, 6, 9]]

    // 行列の内積
    let m = mat1.dot(&mat2);
    println!("{}", m);
    // [[6, 14, 36],
    //  [15, 32, 78],
    //  [24, 50, 120]]

    // 行列とベクトルの積 (v = Ax, このときxは列ベクトル扱い)
    let x = arr1(&[1, 2, 3]);
    let v = mat1.dot(&x);
    println!("{}", v);  // [14, 32, 50]

    // ベクトルと行列の積 (v = xA, このときxは行ベクトル扱い)
    let v = &x.dot(&mat1);
    println!("{}", v);  // [30, 36, 42]
}

高度な機能が無いので、必要に応じて自前で実装する必要があります。例えば、行列式の計算は以下のように行います。

use ndarray::prelude::*;
use ndarray::Array2;

fn determinant(mat: &mut Array2<f64>) -> Option<f64> {
    if mat.shape()[0] != mat.shape()[1] {
        return None;
    }

    let n = mat.shape()[0];

    // 上三角行列を作る
    for i in 0..(n - 1) {
        if mat[[i, i]] == 0.0 {
            for j in (i + 1)..n {
                if mat[[j, i]] != 0.0 {
                    // 置換行列を作って行交換
                    let mut perm_mat: Array2<f64> = Array2::eye(n);
                    perm_mat[[i, i]] = 0.0;
                    perm_mat[[j, i]] = 1.0;
                    perm_mat[[j, j]] = 0.0;
                    perm_mat[[i, j]] = 1.0;

                    let sign = if (j - i) % 2 == 0 { 1.0 } else { -1.0 };

                    *mat = mat.dot(&perm_mat) * sign;
                }
            }

            // 他の全ての行が0なら終了
            break;
        }

        for j in (i + 1)..n {
            let c = mat[[j, i]] / mat[[i, i]];
            for k in 0..n {
                mat[[j, k]] = mat[[j, k]] - c * mat[[i, k]];
            }
        }
    }

    // 対角の値の積
    let det_mat = mat.diag().fold(1.0, |prod, x| prod * x);

    Some(det_mat)
}

fn main() {
    let mut mat = arr2(&[
        [2.0, 1.0, 3.0, 2.0],
        [6.0, 6.0, 10.0, 7.0],
        [2.0, 7.0, 6.0, 6.0],
        [4.0, 5.0, 10.0, 9.0]
    ]);
    let det = determinant(&mut mat);
    println!("{:?}", det);  // Some(-12.0)
}

まとめ

rust-ndarrayを使った行列演算について紹介しました。逆行列や特異値分解などが必要な場合、以下のndarray-linalgなどを検討したほうが良いかもしれません。

github.com

論文メモ: High-Resolution Representations for Labeling Pixels and Regions

姿勢推定の分野でブレイクスルーとなったHRNetを、顔ランドマーク推定タスクなどに適用した High-Resolution Representations for Labeling Pixels and Regions (arXiv) についてのメモです。

@misc{sun2019highresolution,
    title={High-Resolution Representations for Labeling Pixels and Regions},
    author={Ke Sun and Yang Zhao and Borui Jiang and Tianheng Cheng and Bin Xiao and Dong Liu and Yadong Mu and Xinggang Wang and Wenyu Liu and Jingdong Wang},
    year={2019},
    eprint={1904.04514},
    archivePrefix={arXiv},
    primaryClass={cs.CV}
}

HRNet

最初にアーキテクチャのキモとなるHRNetについて説明します。

HRNet (High-Resolution Net) は、今回紹介する論文と同じ著者らによってCVPR'19にて提案されたディープラーニングネットワークアーキテクチャで、姿勢推定タスク (COCO keypoint detectionとMPII Human Pose) で当時のSotAを達成しました1

各タスクの実装 (PyTorch) はGitHubで公開されており、HRNet-Facial-Landmark-Detectionで確認したところ、学習済みのモデルで推論することは比較的簡単にできるようでした。

github.com

HRNetのポイントは2点です。

高解像度・低解像度のサブネットワークが並列

全体としては、高解像度のネットワークからスタートし、深くなるにつれて低解像度のネットワークを追加していく構造となってます。Hourglass2などの先行研究の多くは高解像度から低解像度への直列 (高解像度の情報を捨てるまたは残すだけ) という構造ですが、HRNetはいずれの解像度のネットワークも並列に学習させており、この点が従来と大きく異なります。

  • 出力層には最も高解像度の特徴マップが用いられます (後述しますが、紹介論文にてこの点に改良が加えられます)

f:id:ohke:20200314101150p:plain
[1] Figure 1.抜粋

サブネットワーク間の接続

HRNetは異なる解像度のサブネットワーク間もダウンサンプリング (高解像 -> 低解像) とアップサンプリング (低解像 -> 高解像) によって特徴マップのサイズが調整・接続 (総和) されることで、高解像度・低解像度のそれぞれが持つ情報をやりとりしています。論文中ではexchange unitsと呼んでます。

  • 下図中央でいうと 左上特徴マップのダウンサンプリング + 左中央特徴マップ + 左下特徴マップのアップサンプリング = 出力特徴マップ (右中央) となります

f:id:ohke:20200314102705p:plain
[2] Figure 3.抜粋

HRNetV2

最初に発表された姿勢推定の分野でメジャーになったのですが、何らかの位置の推定が必要なCVタスク一般に適用できるネットワークアーキテクチャとなっています。紹介する論文では、セマンティックセグメンテーション・顔ランドマーク・物体検出にHRNetを適用しています。

姿勢推定では高解像度の特徴マップのみを出力としていました (下図(a)) が、タスクごとに改良が加えられます。

  • 低解像度の特徴マップをバイリニアアップサンプリングして高解像度の特徴マップと連結する (下図(b)) ことで、セグメンテーションマップ (セマンティックセグメンテーション)やヒートマップ (顔ランドマーク検出) の推定に用いる (HRNetV2)
  • 物体検出では、HRNetV2の結果を更に平均プーリングによって何段階かダウンサンプリングし (下図(c)) 、様々なスケール (物体の大きさ) に対応させる (HRNetV2p)
    • 下図(c) の場合、上の4スケールの特徴マップ (4個の菱餅) 全てが出力となります
    • アーキテクチャ全体としては、Faster R-CNNやMask R-CNNと組み合わせて構成する

f:id:ohke:20200314120716p:plain
紹介論文 Figure 3.抜粋

実験

各タスク様々なデータセットを使って実験されており、それぞれ良いパフォーマンス (いくつかはSotA) を達成しています。

顔ランドマーク推定に絞ると、ランドマーク点数が多いWFLW3では全ての条件 (ポーズやイルミネーションなど) で高い性能となっており、ロバストな手法と伺えます。

紹介論文 Table 9.抜粋

HRNetV1とV2の比較ですが、いずれも大きな改善が見られ、出力層の修正が奏功しています。

紹介論文 Figure 4.抜粋

まとめ

今回はHRNetをセマンティックセグメンテーション・物体検出・顔ランドマーク推定へ応用した論文を紹介しました。

論文で示されてみれば、これまでdown sample -> up sampleが主流だったのが不思議に思えてきます。様々なCVタスクでSotAを達成している点からも、応用力のあるアーキテクチャということが伺えます。


  1. K. Sun, B. Xiao, D. Liu, and J. Wang. Deep high-resolution representation learning for human pose estimation. In CVPR, 2019. https://arxiv.org/abs/1902.09212

  2. A. Newell, K. Yang, and J. Deng. Stacked hourglass networks for human pose estimation. In ECCV, pages 483–499, 2016. https://arxiv.org/abs/1603.06937

  3. WFLWについては 顔ランドマークデータセットまとめ (AFLW, LFPW, COFW, 300-W, WFLW) - け日記 で触れました