引き続きPySparkについてです。今回はMLパッケージを使ってスパムメッセージを分類します。
ML
MLパッケージ (pyspark.ml) は機械学習用のパッケージです。
様々な前処理 (Transformer) や 分類・回帰 (Estimator) などを行うクラスが提供されています (公式ドキュメント) 。
機械学習のタスクは多くの場合、上の処理を組み合わせて行います。例えば、スパムメッセージを分類するというタスクならば、メッセージ文のトークン化 → ストップワードの除去 → TF-IDFでベクトル化 → ナイーブベイズで分類、といった多段の処理を行う必要があります。そしてそれらを学習データとテストデータのそれぞれで行います。
Sparkでそうした多段処理を簡単に定義できるのがPipelineです (公式ドキュメント) 。イメージはscikit-learnのPipelineに近いです。
スパムメッセージの分類
スパムメッセージの分類タスクをPipelineを使って実装していきます。
環境は以下で構築したものを使います。
ohke.hateblo.jp
以下のクラスライブラリをインポートしておきます。
import pandas as pd
import urllib.request
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator
データセットのロード
最初にデータセットをダウンロードして、PySparkのDataFrameへ展開します。
データセットはUCIのMachine Learning Repositoryから入手します (参照) 。
urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip', 'smsspamcollection.zip')
with zipfile.ZipFile('./smsspamcollection.zip') as zip_file:
zip_file.extractall('.')
PySparkのセッションを開いて、DataFrameにロードします。
label
にスパムなら"spam"、そうでないなら"ham"が入っています (今回のタスクで予測する変数です)
body
にメッセージ本文が入っています
- スパムメッセージが747通、スパムでないメッセージが4827通
try:
spark_session.stop()
except NameError:
pass
spark_session = SparkSession.builder.appName(
name='spark-notebook',
).master(
master='local[*]',
).enableHiveSupport().getOrCreate()
schema = StructType([
StructField("label", StringType(), True),
StructField('body', StringType(), True)
])
messages = spark_session.read.csv('./SMSSpamCollection', schema=schema, header=None, sep='\t')
print(messages.take(3))
print(messages.groupBy('label').count().collect())
MLパッケージを使った機械学習
5段階の処理を定義します。それぞれカッコ内のクラスを使って行います。なお、TF-IDFは2つの処理を組み合わせる必要があります。
ラベルを数値化
各処理を追っていく前に、文字列となっているラベルを数値に置き換えます。"ham"なら0、"spam"なら1にします。
- DataFrameのwithColumnメソッドで、カラムの値を追加したり置き換えたりできます
- withColumnの第2引数に渡しているのはUserDefinedFunctionで、ここではlabelを0 or 1の整数に置き換える関数 (replace_label_num) を定義してます
def replace_label_num(label):
return int(1 if label == 'spam' else 0)
messages = messages.withColumn(
'label',
UserDefinedFunction(replace_label_num, IntegerType())('label')
)
print(messages.head(3))
学習データとテストデータの分離
加えて、学習用のデータとテスト用のデータを分離します。
DataFrameにはrandomSplitというメソッドが定義されており、第1引数で指定された比率 (ここでは1:1) でデータをランダムに分割してくれます。
messages_train, messages_test = messages.randomSplit([0.5, 0.5], seed=123)
トークン化
各処理を追っていきます。最初にトークン化から見ていきます。トークン化では、1つの文書から単語を切り出していきます。
RegexTokenizerを使って、引数pattern
の正規表現にマッチした場合に、1単語として切り出します。また、各単語は小文字に統一されます。
inputCol
は文書のカラム (body) 、outputCol
には分割したあとの単語リストを持つカラム (words) を指定します。データに対して前処理を行うTransformerは、inputCol
のデータを変換してoutputCol
に出力する、という共通のインタフェースを持ちます。
変換する際は、transformメソッドを使います。これも共通のインタフェースです。
出力結果を見ると、分割されて小文字に統一された単語リストがwordsに入っていることがわかります。
tokenizer = RegexTokenizer(
inputCol='body',
outputCol='words',
pattern='\s+|[,.]'
)
tmp = tokenizer.transform(messages_train)
tmp.head(1)
ストップワードの除去
ストップワードは、代名詞や前置詞など、どの文章でも高い頻度で出現するため文章の特徴を表されない語のことです。ノイズになるので、前処理の段階でこういった語は省いてしまいます。
StopWordsRemoverを使うと、こうした語を削除してくれます。
inputCol
には、tokenizerで出力したカラムをgetOutputColメソッドを使って指定しています。
学習データを使った出力を見ると、"at"や"is"が取り除かれていることがわかります。
stopwrods_remover = StopWordsRemover(
inputCol=tokenizer.getOutputCol(),
outputCol='meaningful_words'
)
tmp = stopwrods_remover.transform(tmp)
print(tmp.select('meaningful_words').head(1))
TF-IDFの計算
TF-IDFでは、まずCountVectorizerで各文書内での単語の出現頻度 (TF) をカウントし、次にIDFで文書間の語の出現頻度の逆数 (IDF) を計算します。
tf_featuresを見ると、全文書の語の種類が1711個存在し、インデックスされた語の出現頻度がdictで入っています。
tfidf_featuresは、tf_featuresを使ってTF-IDFが計算された結果が、やはりdictで入っています (メソッド名がIDFなので、IDFだけ別で計算されそうですが、ちゃんとTF-IDFが計算されます) 。
count_vectorizer = CountVectorizer(
inputCol=stopwrods_remover.getOutputCol(),
outputCol='tf_features',
minDF=3.0
)
tmp = count_vectorizer.fit(tmp).transform(tmp)
print(tmp.select('tf_features').head(1))
idf = IDF(
inputCol=count_vectorizer.getOutputCol(),
outputCol='tfidf_features'
)
tmp = idf.fit(tmp).transform(tmp)
print(tmp.select('tfidf_features').head(1))
ナイーブベイズで学習
最後に、こうして前処理されたデータに対して、ナイーブベイズモデル (NaiveBayes) で学習を行います。
featuresCol
にTF-IDFのベクトル、labelCol
にラベル名を指定してモデルを作成して、モデルを作成します。
分類・回帰モデル (Estimator) はfitメソッドとtransformメソッドを持ってます。fitメソッドで学習し、transformで分類・回帰します (このあたりもscikit-learnと同じですね) 。
1行目のtrain_resultは、予測ラベル (prediction) と正解ラベル (label) が一致しています。
naive_bayes = NaiveBayes(
featuresCol='tfidf_features',
labelCol='label'
)
model = naive_bayes.fit(tmp)
train_result = model.transform(tmp)
print(train_result.select(['label', 'prediction']).head(1))
こうして得られたモデルを使い、まず学習データで予測精度を計測します。
2値分類ですので、BinaryClassificationEvaluatorクラスを使います。rawPredictionCol
に予測ラベル、labelCol
に正解ラベルを指定します。
transformで得られた結果 (train_result) をAUCで評価すると、0.978となりました。
evaluator = BinaryClassificationEvaluator(
rawPredictionCol='prediction',
labelCol='label'
)
print('AUC:', evaluator.evaluate(train_result, {evaluator.metricName: 'areaUnderROC'}))
Pipelineで統合
最後に上で実装した処理をPipelineで1つに連結します。
引数stages
に、上で実装したTransformerとEstimatorをリストにして渡すだけです。あとは学習データを使ってfit、テストデータをtransformすると分類までやってくれます。
pipeline = Pipeline(stages=[
tokenizer,
stopwrods_remover,
count_vectorizer,
idf,
naive_bayes
])
pipeline_transformer = pipeline.fit(messages_train)
test_result = pipeline_transformer.transform(messages_test)
print('AUC:', evaluator.evaluate(test_result, {evaluator.metricName: 'areaUnderROC'}))
まとめ
今回はスパムメッセージを題材に、前処理 (Transformer) と分類・回帰 (Estimator) からなる処理をMLパッケージを使って実装し、最後にそれらの処理をPipelineでまとめて一つにしました。