PySparkでMLを使って機械学習する

引き続きPySparkについてです。今回はMLパッケージを使ってスパムメッセージを分類します。

ML

MLパッケージ (pyspark.ml) は機械学習用のパッケージです。

様々な前処理 (Transformer) や 分類・回帰 (Estimator) などを行うクラスが提供されています (公式ドキュメント) 。

機械学習のタスクは多くの場合、上の処理を組み合わせて行います。例えば、スパムメッセージを分類するというタスクならば、メッセージ文のトークン化 → ストップワードの除去 → TF-IDFでベクトル化 → ナイーブベイズで分類、といった多段の処理を行う必要があります。そしてそれらを学習データとテストデータのそれぞれで行います。
Sparkでそうした多段処理を簡単に定義できるのがPipelineです (公式ドキュメント) 。イメージはscikit-learnのPipelineに近いです。

スパムメッセージの分類

スパムメッセージの分類タスクをPipelineを使って実装していきます。

環境は以下で構築したものを使います。

ohke.hateblo.jp

以下のクラスライブラリをインポートしておきます。

import pandas as pd
import urllib.request
import zipfile

from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator

データセットのロード

最初にデータセットをダウンロードして、PySparkのDataFrameへ展開します。

データセットはUCIのMachine Learning Repositoryから入手します (参照) 。

# データセットをダウンロードして解凍
# https://archive.ics.uci.edu/ml/datasets/sms+spam+collection
urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip', 'smsspamcollection.zip')
with zipfile.ZipFile('./smsspamcollection.zip') as zip_file:
    zip_file.extractall('.')  # ./SMSSpamCollection

PySparkのセッションを開いて、DataFrameにロードします。

  • labelにスパムなら"spam"、そうでないなら"ham"が入っています (今回のタスクで予測する変数です)
  • bodyにメッセージ本文が入っています
  • スパムメッセージが747通、スパムでないメッセージが4827通
# セッションを開く
try:
    spark_session.stop()
except NameError:
    pass

spark_session = SparkSession.builder.appName(
    name='spark-notebook',
).master(
    master='local[*]',
).enableHiveSupport().getOrCreate()

# DataFrameにロード
schema = StructType([
    StructField("label", StringType(), True),
    StructField('body', StringType(), True)
])

messages = spark_session.read.csv('./SMSSpamCollection', schema=schema, header=None, sep='\t')

print(messages.take(3))
# [Row(label='ham', body='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'), 
#  Row(label='ham', body='Ok lar... Joking wif u oni...'), 
#  Row(label='spam', body="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]

print(messages.groupBy('label').count().collect())
# [Row(label='ham', count=4827), Row(label='spam', count=747)]

MLパッケージを使った機械学習

5段階の処理を定義します。それぞれカッコ内のクラスを使って行います。なお、TF-IDFは2つの処理を組み合わせる必要があります。

ラベルを数値化

各処理を追っていく前に、文字列となっているラベルを数値に置き換えます。"ham"なら0、"spam"なら1にします。

  • DataFrameのwithColumnメソッドで、カラムの値を追加したり置き換えたりできます
  • withColumnの第2引数に渡しているのはUserDefinedFunctionで、ここではlabelを0 or 1の整数に置き換える関数 (replace_label_num) を定義してます
def replace_label_num(label):
    return int(1 if label == 'spam' else 0)

messages = messages.withColumn(
    'label', 
    UserDefinedFunction(replace_label_num, IntegerType())('label')
)

print(messages.head(3))
# [Row(label='ham', body='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'), 
#  Row(label='ham', body='Ok lar... Joking wif u oni...'), 
#  Row(label='spam', body="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]

学習データとテストデータの分離

加えて、学習用のデータとテスト用のデータを分離します。

DataFrameにはrandomSplitというメソッドが定義されており、第1引数で指定された比率 (ここでは1:1) でデータをランダムに分割してくれます。

messages_train, messages_test = messages.randomSplit([0.5, 0.5], seed=123)

トークン化

各処理を追っていきます。最初にトークン化から見ていきます。トークン化では、1つの文書から単語を切り出していきます。

RegexTokenizerを使って、引数patternの正規表現にマッチした場合に、1単語として切り出します。また、各単語は小文字に統一されます。
inputColは文書のカラム (body) 、outputColには分割したあとの単語リストを持つカラム (words) を指定します。データに対して前処理を行うTransformerは、inputColのデータを変換してoutputColに出力する、という共通のインタフェースを持ちます。

変換する際は、transformメソッドを使います。これも共通のインタフェースです。

出力結果を見ると、分割されて小文字に統一された単語リストがwordsに入っていることがわかります。

tokenizer = RegexTokenizer(
    inputCol='body',
    outputCol='words',
    pattern='\s+|[,.]'
)

tmp = tokenizer.transform(messages_train)
tmp.head(1)
# [Row(label=0, body=' came to look at the flat, seems ok, in his 50s? * Is away alot wiv work. Got woman coming at 6.30 too.', words=['came', 'to', 'look', 'at', 'the', 'flat', 'seems', 'ok', 'in', 'his', '50s?', '*', 'is', 'away', 'alot', 'wiv', 'work', 'got', 'woman', 'coming', 'at', '6', '30', 'too'])]

ストップワードの除去

ストップワードは、代名詞や前置詞など、どの文章でも高い頻度で出現するため文章の特徴を表されない語のことです。ノイズになるので、前処理の段階でこういった語は省いてしまいます。

StopWordsRemoverを使うと、こうした語を削除してくれます。
inputColには、tokenizerで出力したカラムをgetOutputColメソッドを使って指定しています。

学習データを使った出力を見ると、"at"や"is"が取り除かれていることがわかります。

# ストップワードの除去
stopwrods_remover = StopWordsRemover(
    inputCol=tokenizer.getOutputCol(),
    outputCol='meaningful_words'
)

tmp = stopwrods_remover.transform(tmp)
print(tmp.select('meaningful_words').head(1))
# [Row(meaningful_words=['came', 'look', 'flat', 'seems', 'ok', '50s?', '*', 'away', 'alot', 'wiv', 'work', 'got', 'woman', 'coming', '6', '30'])]

TF-IDFの計算

TF-IDFでは、まずCountVectorizerで各文書内での単語の出現頻度 (TF) をカウントし、次にIDFで文書間の語の出現頻度の逆数 (IDF) を計算します。

tf_featuresを見ると、全文書の語の種類が1711個存在し、インデックスされた語の出現頻度がdictで入っています。
tfidf_featuresは、tf_featuresを使ってTF-IDFが計算された結果が、やはりdictで入っています (メソッド名がIDFなので、IDFだけ別で計算されそうですが、ちゃんとTF-IDFが計算されます) 。

# TF
count_vectorizer = CountVectorizer(
    inputCol=stopwrods_remover.getOutputCol(), 
    outputCol='tf_features',
    minDF=3.0
)

tmp = count_vectorizer.fit(tmp).transform(tmp)
print(tmp.select('tf_features').head(1))
# [Row(tf_features=SparseVector(1711, {6: 1.0, 14: 1.0, 70: 1.0, 164: 1.0, 195: 1.0, 216: 1.0, 302: 1.0, 310: 1.0, 357: 1.0, 709: 1.0, 758: 1.0, 1089: 1.0, 1218: 1.0}))]

# TF-IDF
idf = IDF(
    inputCol=count_vectorizer.getOutputCol(),
    outputCol='tfidf_features'
)

tmp = idf.fit(tmp).transform(tmp)
print(tmp.select('tfidf_features').head(1))
# [Row(tfidf_features=SparseVector(1711, {6: 2.9551, 14: 3.1817, 70: 4.1567, 164: 4.7398, 195: 4.8269, 216: 5.0275, 302: 5.1453, 310: 5.1453, 357: 5.2788, 709: 5.8385, 758: 6.1261, 1089: 6.3085, 1218: 6.3085}))]

ナイーブベイズで学習

最後に、こうして前処理されたデータに対して、ナイーブベイズモデル (NaiveBayes) で学習を行います。

featuresColにTF-IDFのベクトル、labelColにラベル名を指定してモデルを作成して、モデルを作成します。
分類・回帰モデル (Estimator) はfitメソッドとtransformメソッドを持ってます。fitメソッドで学習し、transformで分類・回帰します (このあたりもscikit-learnと同じですね) 。

1行目のtrain_resultは、予測ラベル (prediction) と正解ラベル (label) が一致しています。

naive_bayes = NaiveBayes(
    featuresCol='tfidf_features',
    labelCol='label'
)

model = naive_bayes.fit(tmp)
train_result = model.transform(tmp)

print(train_result.select(['label', 'prediction']).head(1))
# [Row(label=0, prediction=0.0)]

こうして得られたモデルを使い、まず学習データで予測精度を計測します。

2値分類ですので、BinaryClassificationEvaluatorクラスを使います。rawPredictionColに予測ラベル、labelColに正解ラベルを指定します。

transformで得られた結果 (train_result) をAUCで評価すると、0.978となりました。

evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='prediction',
    labelCol='label'
)

print('AUC:', evaluator.evaluate(train_result, {evaluator.metricName: 'areaUnderROC'}))
# AUC: 0.9780369903320724

Pipelineで統合

最後に上で実装した処理をPipelineで1つに連結します。

引数stagesに、上で実装したTransformerとEstimatorをリストにして渡すだけです。あとは学習データを使ってfit、テストデータをtransformすると分類までやってくれます。

pipeline = Pipeline(stages=[
    tokenizer,
    stopwrods_remover,
    count_vectorizer,
    idf,
    naive_bayes
])

# 学習
pipeline_transformer = pipeline.fit(messages_train)

# テスト
test_result = pipeline_transformer.transform(messages_test)
print('AUC:', evaluator.evaluate(test_result, {evaluator.metricName: 'areaUnderROC'}))
# AUC: 0.9442337630590295

まとめ

今回はスパムメッセージを題材に、前処理 (Transformer) と分類・回帰 (Estimator) からなる処理をMLパッケージを使って実装し、最後にそれらの処理をPipelineでまとめて一つにしました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム