PySparkのMLパッケージを使ってMovieLensをレコメンドする

前回・前々回と引き続き、PySparkを使っていきます。

今回はMLパッケージを使って映画のレコメンドを行います。

データセットにはおなじみMovieLens 100kを使います。

grouplens.org

MLパッケージ (pyspark.ml)

PySparkには機械学習用のパッケージが2種類提供されています。

いずれも主要な機能を持っていますが、2つは扱うデータのインタフェースが異なっており、MLibはRDD、MLはDataFrameとなっています。
Spark 2.0以降、MLibの方はメンテナンスのみ (将来的には非推奨) のため、これからであればMLを使うのが筋のようです。今回もMLを使います。

MLパッケージは、データの変換や特徴量の抽出などの前処理、分類タスクや回帰タスク、それらを一連の処理へ統合するPipeline (scikit-learnのPipelineと似たようなもの) など、機械学習に必要な機能が提供されてます。
今回は、recommendationモジュールを使います。

MovieLens 100kのレコメンド

それでは実装をしていきます。前々回に構築した環境を想定してます。

ohke.hateblo.jp

MovieLens 100kのダウンロードとDataFrameへの展開

まずMovieLens 100kのデータセットをダウンロードし、直下のディレクトリ (./ml-100k/) に解凍します。

import pandas as pd
import urllib.request
import zipfile

# MovieLens100kをダウンロードして解凍
urllib.request.urlretrieve('http://files.grouplens.org/datasets/movielens/ml-100k.zip', 'ml-100k.zip')
with zipfile.ZipFile('./ml-100k.zip') as zip_file:
    zip_file.extractall('.')

そこからまず映画のデータを./ml-100k/u.itemからPandasのDataFrameにロードします。

import codecs

# pd.DataFrameへ映画のデータを読み込む
with codecs.open('ml-100k/u.item', 'r', 'utf-8', errors='ignore') as f:
    movies = pd.read_table(f, delimiter='|', header=None).iloc[:, 0:2]
    movies.rename(columns={0: 'item', 1: 'item_title'}, inplace=True)

# 3本を抽出 (あとで使います)
movies[movies['item'].isin([127, 187, 94])]

f:id:ohke:20180908230119p:plain

次に、レイティングのデータを./ml-100k/u.dataからsparkのDataFrameに直接ロードします。前回同様に、sparkのセッションを開いて、ローカルのCSVファイルから読み込みます。

from pyspark.sql import SparkSession
from pyspark.sql.types import *

# セッションを開く
spark_session = SparkSession.builder.appName(
    name='spark-notebook',
).master(
    master='local[*]',
).enableHiveSupport().getOrCreate()


# スキーマを定義
schema = StructType([
    StructField("user", LongType(), True),
    StructField('item', LongType(), True),
    StructField('rating', LongType(), True)
])

# sparkのDataFrameを作成
ratings = spark_session.read.csv('./ml-100k/u.data', schema=schema, header=None, sep='\t')
print(ratings.head()) # Row(user=196, item=242, rating=3)

ALSモデルの作成と学習

recommendationモジュールには、ALSのみが実装されています。
ALS (Alternative Least Squares: 交互最小二乗法) は、レイティング行列をNMF (非負値行列因子分解) する方法の一つです。NMFによるレコメンドは以前このブログでも取り上げましたので、詳細はこちらをご覧ください。

ohke.hateblo.jp

実装は以下です。ALSのクラスを生成し、fitメソッドで学習させています。ALSのパラメータは次のとおりです (が、maxIterregParamについてはよくわかってないので、次の投稿で深掘りする予定です) 。

  • rank: 行列分解するときの次元数で、アイテムとユーザの特徴ベクトルの次元数となります
  • maxIter: イテレーション回数
  • regParam: 正則化の強さ(0-1)
  • userCol, itemCol, ratingCol: ユーザ、アイテム、レイティングの各カラム名
from pyspark.ml.recommendation import ALS

# モデルの作成
als = ALS(
    rank=20, 
    maxIter=10,
    regParam=0.1,
    userCol='user',
    itemCol='item',
    ratingCol='rating',
    seed=0
)

# 学習
model = als.fit(ratings)

アイテム同士のベクトルの比較

得られたモデルから、アイテムのベクトルを抽出して比較したところ、"Godfather, The (1972)"と"Godfather: Part II, The (1974)"の類似度は0.96だったのに対して、"Godfather, The (1972)"と"Home Alone (1990)"の類似度は0.65となりました。ALSによって映画間の類似が表現できていることが定性的にわかります。

  • 各アイテムのベクトルはitemFactorsプロパティで取得
import numpy as np

# 94: Home Alone (1990)
# 127: Godfather, The (1972)
# 187: Godfather: Part II, The (1974)
tmp = model.itemFactors.filter('id IN (94, 127, 187)').orderBy('id').select('features').collect()

homealone_vec = np.array(tmp[0][0])
godfather1_vec = np.array(tmp[1][0])
godfather2_vec = np.array(tmp[2][0])

# ベクトルの次元数を確認
print(godfather1_vec.shape) # (20,)

# 映画のベクトル同士を比較 (コサイン類似度)
print((godfather1_vec @ godfather2_vec) / (np.linalg.norm(godfather1_vec) * np.linalg.norm(godfather2_vec)))
# 0.955007102215
print((godfather1_vec @ homealone_vec) / (np.linalg.norm(godfather1_vec) * np.linalg.norm(homealone_vec)))
# 0.649085345764

ユーザにレコメンド

最後に、得られたモデルを使ってユーザへおすすめの映画をランキング化します。

ここでは30番のユーザを使います。このユーザが最高評価 (5点) を付けている映画の一覧を見てみると、ヒューマンドラマ系が嗜好が偏っているようです。

# ユーザ:30の高評価映画
tmp = ratings.filter('user = 30 AND rating = 5').toPandas()
pd.merge(tmp, movies, on='item')

このユーザにおすすめトップ10を計算します。やはりヒューマンドラマ系の映画が高評価と予測されていることがわかります。

  • recommendationsがarrayとなっていますので、selectで展開してます
# ユーザ:30のおすすめ
tmp = model.recommendForAllUsers(
    numItems=10
).filter(
    'user = 30'
).select(
    'recommendations.item', 
    'recommendations.rating'
).first()

recommends = pd.DataFrame({
    'item': tmp[0],
    'rating': tmp[1]
})

pd.merge(recommends, movies, on='item')

f:id:ohke:20180908230155p:plain

まとめ

PySparkのMLパッケージを使って、MovieLens 100kのデータを使って映画のレコメンドを行いました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

PySpark (+Jupyter Notebook) でDataFrameを扱う

前回の投稿では、PySparkをJupyter Notebookから操作するための環境を作りました。

ohke.hateblo.jp

今回は上の環境を使って、PySparkでDataFrameを扱う方法についてまとめます。
(そのため上の環境構築が済んでいる前提となります。)

SparkのDataFrame

Sparkで、分散させるデータを扱うためのAPIが3種類あります。

  1. RDD
  2. DataFrame
  3. DataSet

このうち、PySparkでできるのはRDD or DataFrameとなります (DataSet APIは型安全性を確保するのがメインテーマであるため、Scala/Javaのみサポートされてます) 。

ではどちらを使うべきかですが、DataFrame APIを使うのが良いそうです。
というのも、DataFrame APIはCatalyst Optimizerがラップされており、これがクエリの実行計画を最適化するため、Scala/Javaと比較しても遜色ない程度のパフォーマンスが得られるためです (RDDではpy4jによって発生するPythonとJVMの間のコンテキストスイッチに対して最適化されていないので、Scala/Javaと比較して格段に遅くなりやすかったのです) 。

DataFrameの作成

最初にpysparkをインストールしておきます。

$ pip install pyspark

今回はUCIから提供されている肺がんデータセットをダウンロードして使います。

http://mlr.cs.umass.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Original%29

ダウンロードしたCSVファイルをDataFrameを作りますが、その前にSparkSessionでセッションを作成します。
インタラクティブシェルであれば起動時に自動的にセッションが作成あれ、sparkオブジェクトを介してアクセスできます。インタラクティブシェルを使わない場合 (Jupyter Notebookなど) は、以下のように自前でセッションを確立します。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import urllib.request

# 肺がんデータセットをダウンロード
urllib.request.urlretrieve('http://mlr.cs.umass.edu/ml/machine-learning-databases/breast-cancer-wisconsin/breast-cancer-wisconsin.data', 'breast-cancer-wisconsin.data')

# セッションの作成
spark_session = SparkSession.builder.getOrCreate()
# セッションを切るときは spark_session.stop()

上で確立したSparkSessionを使って、DataFrameを作成します。

  • スキーマはStructTypeで定義します
  • CSVファイルから作成しますので、DataFrameReaderクラスのcsvメソッドにファイルパス (とスキーマ定義) を渡してます
# CSVファイルのスキーマを定義
data_schema = StructType([
    StructField("id", StringType(), False),
    StructField('clump_thickness', LongType(), True),
    StructField('uniformity_of_cell_size', LongType(), True),
    StructField('uniformity_of_cell_shape', LongType(), True),
    StructField('marginal_adhesion', LongType(), True),
    StructField('single_epithelial_cell_size', LongType(), True),
    StructField('bare_nuclei', LongType(), True),
    StructField('bland_chromatin', LongType(), True),
    StructField('normal_nucleoli', LongType(), True),
    StructField('mitoses', LongType(), True),
    StructField('classification', LongType(), True),
])

# DataFrameの作成
data = spark_session.read.csv('breast-cancer-wisconsin.data', schema=data_schema)
data.show(1)

#+-------+---------------+-----------------------+------------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+--------------+
|     #id|clump_thickness|uniformity_of_cell_size|uniformity_of_cell_shape|marginal_adhesion|single_epithelial_cell_size|bare_nuclei|bland_chromatin|normal_nucleoli|mitoses|classification|
#+-------+---------------+-----------------------+------------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+--------------+
#|1000025|              5|                      1|                       1|                1|                          2|          1|              3|              1|      1|             2|
#+-------+---------------+-----------------------+------------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+--------------+
#only showing top 1 row

DataFrameへのクエリ

DataFrameへのクエリは、DataFrameオブジェクトのAPIを使う方法と、SQLで記述する方法の2種類があります。

DataFrameオブジェクトのAPI

作成したDataFrameオブジェクトのAPIを使ってクエリする例は以下です。

メソッドチェーンで選択、射影、結合を記述します。それぞれfilterselectjoinのメソッドで行われます。
アクション (showcounttakecollectなど) を実行するまでは遅延評価されます。C#のLINQなどにイメージは近いです。

# id列だけ5件を表示する
data.select('id').show(5)
# +-------+
# |     id|
# +-------+
# |1000025|
# |1002945|
# |1015425|
# |1016277|
# |1017023|
# +-------+

# classificationが4の行数を返す
data.filter("classification == 4").count() # 241

# idの末尾が0で、かつ、classificationが4の行を3行取得
data.select('id', 'classification').filter("id like '%0' and classification == 4").take(3)
# [Row(id='1047630', classification=4),
#  Row(id='1050670', classification=4),
#  Row(id='1054590', classification=4)]

# classificationが2ならば"benign", 4ならば"malignant"とするJSONファイルを作成
with open('label.json', mode='w') as f:
    f.write("""
    [
        { "classification": 2, "label": "benign" },
        { "classification": 4, "label": "malignant" }
    ]
    """)

# JSONファイルからDataFrameを作成
label = spark_session.read.json('label.json', multiLine=True)

# dataとjoinする
data.join(label, data.classification == label.classification).select('id', 'label').take(3)
# [Row(id='1000025', label='benign'),
#  Row(id='1002945', label='benign'),
#  Row(id='1015425', label='benign')]

SQLを使ったクエリ

セッションを使って、SQLで記述することもできます。

SQLの場合、DataFrameから一時テーブルを作成する必要があります。ここでは、createOrReplaceTempViewメソッドで一時テーブル化してます。
SQLでは、作成時に渡された名前でテーブルを参照します。

# 一時テーブルを作成
data.createOrReplaceTempView('cancer')
label.createOrReplaceTempView('label')

spark_session.sql("""
select id, label
from cancer 
    inner join label
        on cancer.classification = label.classification
where id like '%0'
""").take(3)
# [Row(id='1047630', label='malignant'),
#  Row(id='1050670', label='malignant'),
#  Row(id='1054590', label='malignant'),
#  Row(id='1071760', label='benign'),
#  Row(id='1074610', label='benign')]

まとめ

今回はPySparkでDataFrameを扱う方法について整理しました。最初にセッションを確立し、DataFrameのAPI or SQLで操作できることを確認しました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

PySpark + Jupyter Notebookの環境をDockerで構築する

お仕事でSparkをJupyter Notebook (Python 3) から使うかもしれないということで、PySparkの実験用環境をDockerで構築する方法について調べました。

今回は、PySpark APIを使ったJupyter Notebookの動作確認をローカルで完結できるようにすることをゴールにします (別ノードで計算させようとすると、sparkmagicやlivyの導入・設定が必要となるようです) 。

最初にpyspark-notebookイメージをプルして起動します。このイメージは、Jupyter Notebookのイメージ (jupyter/scipy-notebook) をベースに、Sparkのダウンロードや環境変数の設定 (SPARK_HOMESPARK_OPTSなど) が行われています。

https://hub.docker.com/r/jupyter/pyspark-notebook/

$ docker pull jupyter/pyspark-notebook

$ docker images jupyter/pyspark-notebook
REPOSITORY                 TAG                 IMAGE ID            CREATED             SIZE
jupyter/pyspark-notebook   latest              50a39e5a0bf6        3 days ago          5.45GB

$ docker run -p 8890:8888 -v /tmp/pyspark-notebook:/home/jovyan/work jupyter/pyspark-notebook start-notebook.sh

次に、Webブラウザを開いて、起動したJupyter Notebookへアクセスします (上の場合ですと、"http://localhost:8890"です) 。トークンが聞かれますので、起動時のコンソールで表示されるトークンを入力しましょう (最初からトークン付きURL "ttp://localhost:8890/?token=xxxxxxxx" でアクセスしても良いです) 。

Jupyter Notebook上でTerminalを開き、pysparkのシェルが起動できることを確認します。また"sc"でSparkContextインスタンスにもアクセスできることも確認できます。

では、Jupyter NotebookでPython 3のファイルを新規作成し、手始めにSparkContextを表示できることを確認します。

あとはPySparkのAPIで計算できるようになります (公式ドキュメントはこちら) 。

data = [1, 2, 3, 4, 5]

rdd = sc.parallelize(data)

rdd.sum()  # 15