PySparkのMLパッケージを使ってMovieLensをレコメンドする

前回・前々回と引き続き、PySparkを使っていきます。

今回はMLパッケージを使って映画のレコメンドを行います。

データセットにはおなじみMovieLens 100kを使います。

grouplens.org

MLパッケージ (pyspark.ml)

PySparkには機械学習用のパッケージが2種類提供されています。

いずれも主要な機能を持っていますが、2つは扱うデータのインタフェースが異なっており、MLibはRDD、MLはDataFrameとなっています。
Spark 2.0以降、MLibの方はメンテナンスのみ (将来的には非推奨) のため、これからであればMLを使うのが筋のようです。今回もMLを使います。

MLパッケージは、データの変換や特徴量の抽出などの前処理、分類タスクや回帰タスク、それらを一連の処理へ統合するPipeline (scikit-learnのPipelineと似たようなもの) など、機械学習に必要な機能が提供されてます。
今回は、recommendationモジュールを使います。

MovieLens 100kのレコメンド

それでは実装をしていきます。前々回に構築した環境を想定してます。

ohke.hateblo.jp

MovieLens 100kのダウンロードとDataFrameへの展開

まずMovieLens 100kのデータセットをダウンロードし、直下のディレクトリ (./ml-100k/) に解凍します。

import pandas as pd
import urllib.request
import zipfile

# MovieLens100kをダウンロードして解凍
urllib.request.urlretrieve('http://files.grouplens.org/datasets/movielens/ml-100k.zip', 'ml-100k.zip')
with zipfile.ZipFile('./ml-100k.zip') as zip_file:
    zip_file.extractall('.')

そこからまず映画のデータを./ml-100k/u.itemからPandasのDataFrameにロードします。

import codecs

# pd.DataFrameへ映画のデータを読み込む
with codecs.open('ml-100k/u.item', 'r', 'utf-8', errors='ignore') as f:
    movies = pd.read_table(f, delimiter='|', header=None).iloc[:, 0:2]
    movies.rename(columns={0: 'item', 1: 'item_title'}, inplace=True)

# 3本を抽出 (あとで使います)
movies[movies['item'].isin([127, 187, 94])]

f:id:ohke:20180908230119p:plain

次に、レイティングのデータを./ml-100k/u.dataからsparkのDataFrameに直接ロードします。前回同様に、sparkのセッションを開いて、ローカルのCSVファイルから読み込みます。

from pyspark.sql import SparkSession
from pyspark.sql.types import *

# セッションを開く
spark_session = SparkSession.builder.appName(
    name='spark-notebook',
).master(
    master='local[*]',
).enableHiveSupport().getOrCreate()


# スキーマを定義
schema = StructType([
    StructField("user", LongType(), True),
    StructField('item', LongType(), True),
    StructField('rating', LongType(), True)
])

# sparkのDataFrameを作成
ratings = spark_session.read.csv('./ml-100k/u.data', schema=schema, header=None, sep='\t')
print(ratings.head()) # Row(user=196, item=242, rating=3)

ALSモデルの作成と学習

recommendationモジュールには、ALSのみが実装されています。
ALS (Alternative Least Squares: 交互最小二乗法) は、レイティング行列をNMF (非負値行列因子分解) する方法の一つです。NMFによるレコメンドは以前このブログでも取り上げましたので、詳細はこちらをご覧ください。

ohke.hateblo.jp

実装は以下です。ALSのクラスを生成し、fitメソッドで学習させています。ALSのパラメータは次のとおりです (が、maxIterregParamについてはよくわかってないので、次の投稿で深掘りする予定です) 。

  • rank: 行列分解するときの次元数で、アイテムとユーザの特徴ベクトルの次元数となります
  • maxIter: イテレーション回数
  • regParam: 正則化の強さ(0-1)
  • userCol, itemCol, ratingCol: ユーザ、アイテム、レイティングの各カラム名
from pyspark.ml.recommendation import ALS

# モデルの作成
als = ALS(
    rank=20, 
    maxIter=10,
    regParam=0.1,
    userCol='user',
    itemCol='item',
    ratingCol='rating',
    seed=0
)

# 学習
model = als.fit(ratings)

アイテム同士のベクトルの比較

得られたモデルから、アイテムのベクトルを抽出して比較したところ、"Godfather, The (1972)"と"Godfather: Part II, The (1974)"の類似度は0.96だったのに対して、"Godfather, The (1972)"と"Home Alone (1990)"の類似度は0.65となりました。ALSによって映画間の類似が表現できていることが定性的にわかります。

  • 各アイテムのベクトルはitemFactorsプロパティで取得
import numpy as np

# 94: Home Alone (1990)
# 127: Godfather, The (1972)
# 187: Godfather: Part II, The (1974)
tmp = model.itemFactors.filter('id IN (94, 127, 187)').orderBy('id').select('features').collect()

homealone_vec = np.array(tmp[0][0])
godfather1_vec = np.array(tmp[1][0])
godfather2_vec = np.array(tmp[2][0])

# ベクトルの次元数を確認
print(godfather1_vec.shape) # (20,)

# 映画のベクトル同士を比較 (コサイン類似度)
print((godfather1_vec @ godfather2_vec) / (np.linalg.norm(godfather1_vec) * np.linalg.norm(godfather2_vec)))
# 0.955007102215
print((godfather1_vec @ homealone_vec) / (np.linalg.norm(godfather1_vec) * np.linalg.norm(homealone_vec)))
# 0.649085345764

ユーザにレコメンド

最後に、得られたモデルを使ってユーザへおすすめの映画をランキング化します。

ここでは30番のユーザを使います。このユーザが最高評価 (5点) を付けている映画の一覧を見てみると、ヒューマンドラマ系が嗜好が偏っているようです。

# ユーザ:30の高評価映画
tmp = ratings.filter('user = 30 AND rating = 5').toPandas()
pd.merge(tmp, movies, on='item')

このユーザにおすすめトップ10を計算します。やはりヒューマンドラマ系の映画が高評価と予測されていることがわかります。

  • recommendationsがarrayとなっていますので、selectで展開してます
# ユーザ:30のおすすめ
tmp = model.recommendForAllUsers(
    numItems=10
).filter(
    'user = 30'
).select(
    'recommendations.item', 
    'recommendations.rating'
).first()

recommends = pd.DataFrame({
    'item': tmp[0],
    'rating': tmp[1]
})

pd.merge(recommends, movies, on='item')

f:id:ohke:20180908230155p:plain

まとめ

PySparkのMLパッケージを使って、MovieLens 100kのデータを使って映画のレコメンドを行いました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム