引き続きPySparkについてです。今回はMLパッケージを使ってスパムメッセージを分類します。
- PySpark + Jupyter Notebookの環境をDockerで構築する - け日記
- PySpark (+Jupyter Notebook) でDataFrameを扱う - け日記
- PySparkのMLパッケージを使ってMovieLensをレコメンドする - け日記
ML
MLパッケージ (pyspark.ml) は機械学習用のパッケージです。
様々な前処理 (Transformer) や 分類・回帰 (Estimator) などを行うクラスが提供されています (公式ドキュメント) 。
機械学習のタスクは多くの場合、上の処理を組み合わせて行います。例えば、スパムメッセージを分類するというタスクならば、メッセージ文のトークン化 → ストップワードの除去 → TF-IDFでベクトル化 → ナイーブベイズで分類、といった多段の処理を行う必要があります。そしてそれらを学習データとテストデータのそれぞれで行います。
Sparkでそうした多段処理を簡単に定義できるのがPipelineです (公式ドキュメント) 。イメージはscikit-learnのPipelineに近いです。
スパムメッセージの分類
スパムメッセージの分類タスクをPipelineを使って実装していきます。
環境は以下で構築したものを使います。
以下のクラスライブラリをインポートしておきます。
import pandas as pd import urllib.request import zipfile from pyspark.sql import SparkSession from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import * from pyspark.ml import Pipeline from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF from pyspark.ml.classification import NaiveBayes from pyspark.ml.evaluation import BinaryClassificationEvaluator
データセットのロード
最初にデータセットをダウンロードして、PySparkのDataFrameへ展開します。
データセットはUCIのMachine Learning Repositoryから入手します (参照) 。
# データセットをダウンロードして解凍 # https://archive.ics.uci.edu/ml/datasets/sms+spam+collection urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip', 'smsspamcollection.zip') with zipfile.ZipFile('./smsspamcollection.zip') as zip_file: zip_file.extractall('.') # ./SMSSpamCollection
PySparkのセッションを開いて、DataFrameにロードします。
label
にスパムなら"spam"、そうでないなら"ham"が入っています (今回のタスクで予測する変数です)body
にメッセージ本文が入っています- スパムメッセージが747通、スパムでないメッセージが4827通
# セッションを開く try: spark_session.stop() except NameError: pass spark_session = SparkSession.builder.appName( name='spark-notebook', ).master( master='local[*]', ).enableHiveSupport().getOrCreate() # DataFrameにロード schema = StructType([ StructField("label", StringType(), True), StructField('body', StringType(), True) ]) messages = spark_session.read.csv('./SMSSpamCollection', schema=schema, header=None, sep='\t') print(messages.take(3)) # [Row(label='ham', body='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'), # Row(label='ham', body='Ok lar... Joking wif u oni...'), # Row(label='spam', body="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")] print(messages.groupBy('label').count().collect()) # [Row(label='ham', count=4827), Row(label='spam', count=747)]
MLパッケージを使った機械学習
5段階の処理を定義します。それぞれカッコ内のクラスを使って行います。なお、TF-IDFは2つの処理を組み合わせる必要があります。
- メッセージ本文のトークン化 (RegexTokenizer)
- ストップワードの除去 (StopWordsRemover)
- TF-IDFでベクトル化
- TFの計算 (CountVectorizer)
- IDFの計算 (IDF)
- ナイーブベイズで分類 (NaiveBayes)
ラベルを数値化
各処理を追っていく前に、文字列となっているラベルを数値に置き換えます。"ham"なら0、"spam"なら1にします。
- DataFrameのwithColumnメソッドで、カラムの値を追加したり置き換えたりできます
- withColumnの第2引数に渡しているのはUserDefinedFunctionで、ここではlabelを0 or 1の整数に置き換える関数 (replace_label_num) を定義してます
def replace_label_num(label): return int(1 if label == 'spam' else 0) messages = messages.withColumn( 'label', UserDefinedFunction(replace_label_num, IntegerType())('label') ) print(messages.head(3)) # [Row(label='ham', body='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'), # Row(label='ham', body='Ok lar... Joking wif u oni...'), # Row(label='spam', body="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]
学習データとテストデータの分離
加えて、学習用のデータとテスト用のデータを分離します。
DataFrameにはrandomSplitというメソッドが定義されており、第1引数で指定された比率 (ここでは1:1) でデータをランダムに分割してくれます。
messages_train, messages_test = messages.randomSplit([0.5, 0.5], seed=123)
トークン化
各処理を追っていきます。最初にトークン化から見ていきます。トークン化では、1つの文書から単語を切り出していきます。
RegexTokenizerを使って、引数pattern
の正規表現にマッチした場合に、1単語として切り出します。また、各単語は小文字に統一されます。
inputCol
は文書のカラム (body) 、outputCol
には分割したあとの単語リストを持つカラム (words) を指定します。データに対して前処理を行うTransformerは、inputCol
のデータを変換してoutputCol
に出力する、という共通のインタフェースを持ちます。
変換する際は、transformメソッドを使います。これも共通のインタフェースです。
出力結果を見ると、分割されて小文字に統一された単語リストがwordsに入っていることがわかります。
tokenizer = RegexTokenizer( inputCol='body', outputCol='words', pattern='\s+|[,.]' ) tmp = tokenizer.transform(messages_train) tmp.head(1) # [Row(label=0, body=' came to look at the flat, seems ok, in his 50s? * Is away alot wiv work. Got woman coming at 6.30 too.', words=['came', 'to', 'look', 'at', 'the', 'flat', 'seems', 'ok', 'in', 'his', '50s?', '*', 'is', 'away', 'alot', 'wiv', 'work', 'got', 'woman', 'coming', 'at', '6', '30', 'too'])]
ストップワードの除去
ストップワードは、代名詞や前置詞など、どの文章でも高い頻度で出現するため文章の特徴を表されない語のことです。ノイズになるので、前処理の段階でこういった語は省いてしまいます。
StopWordsRemoverを使うと、こうした語を削除してくれます。
inputCol
には、tokenizerで出力したカラムをgetOutputColメソッドを使って指定しています。
学習データを使った出力を見ると、"at"や"is"が取り除かれていることがわかります。
# ストップワードの除去 stopwrods_remover = StopWordsRemover( inputCol=tokenizer.getOutputCol(), outputCol='meaningful_words' ) tmp = stopwrods_remover.transform(tmp) print(tmp.select('meaningful_words').head(1)) # [Row(meaningful_words=['came', 'look', 'flat', 'seems', 'ok', '50s?', '*', 'away', 'alot', 'wiv', 'work', 'got', 'woman', 'coming', '6', '30'])]
TF-IDFの計算
TF-IDFでは、まずCountVectorizerで各文書内での単語の出現頻度 (TF) をカウントし、次にIDFで文書間の語の出現頻度の逆数 (IDF) を計算します。
tf_featuresを見ると、全文書の語の種類が1711個存在し、インデックスされた語の出現頻度がdictで入っています。
tfidf_featuresは、tf_featuresを使ってTF-IDFが計算された結果が、やはりdictで入っています (メソッド名がIDFなので、IDFだけ別で計算されそうですが、ちゃんとTF-IDFが計算されます) 。
# TF count_vectorizer = CountVectorizer( inputCol=stopwrods_remover.getOutputCol(), outputCol='tf_features', minDF=3.0 ) tmp = count_vectorizer.fit(tmp).transform(tmp) print(tmp.select('tf_features').head(1)) # [Row(tf_features=SparseVector(1711, {6: 1.0, 14: 1.0, 70: 1.0, 164: 1.0, 195: 1.0, 216: 1.0, 302: 1.0, 310: 1.0, 357: 1.0, 709: 1.0, 758: 1.0, 1089: 1.0, 1218: 1.0}))] # TF-IDF idf = IDF( inputCol=count_vectorizer.getOutputCol(), outputCol='tfidf_features' ) tmp = idf.fit(tmp).transform(tmp) print(tmp.select('tfidf_features').head(1)) # [Row(tfidf_features=SparseVector(1711, {6: 2.9551, 14: 3.1817, 70: 4.1567, 164: 4.7398, 195: 4.8269, 216: 5.0275, 302: 5.1453, 310: 5.1453, 357: 5.2788, 709: 5.8385, 758: 6.1261, 1089: 6.3085, 1218: 6.3085}))]
ナイーブベイズで学習
最後に、こうして前処理されたデータに対して、ナイーブベイズモデル (NaiveBayes) で学習を行います。
featuresCol
にTF-IDFのベクトル、labelCol
にラベル名を指定してモデルを作成して、モデルを作成します。
分類・回帰モデル (Estimator) はfitメソッドとtransformメソッドを持ってます。fitメソッドで学習し、transformで分類・回帰します (このあたりもscikit-learnと同じですね) 。
1行目のtrain_resultは、予測ラベル (prediction) と正解ラベル (label) が一致しています。
naive_bayes = NaiveBayes( featuresCol='tfidf_features', labelCol='label' ) model = naive_bayes.fit(tmp) train_result = model.transform(tmp) print(train_result.select(['label', 'prediction']).head(1)) # [Row(label=0, prediction=0.0)]
こうして得られたモデルを使い、まず学習データで予測精度を計測します。
2値分類ですので、BinaryClassificationEvaluatorクラスを使います。rawPredictionCol
に予測ラベル、labelCol
に正解ラベルを指定します。
transformで得られた結果 (train_result) をAUCで評価すると、0.978となりました。
evaluator = BinaryClassificationEvaluator( rawPredictionCol='prediction', labelCol='label' ) print('AUC:', evaluator.evaluate(train_result, {evaluator.metricName: 'areaUnderROC'})) # AUC: 0.9780369903320724
Pipelineで統合
最後に上で実装した処理をPipelineで1つに連結します。
引数stages
に、上で実装したTransformerとEstimatorをリストにして渡すだけです。あとは学習データを使ってfit、テストデータをtransformすると分類までやってくれます。
pipeline = Pipeline(stages=[ tokenizer, stopwrods_remover, count_vectorizer, idf, naive_bayes ]) # 学習 pipeline_transformer = pipeline.fit(messages_train) # テスト test_result = pipeline_transformer.transform(messages_test) print('AUC:', evaluator.evaluate(test_result, {evaluator.metricName: 'areaUnderROC'})) # AUC: 0.9442337630590295
まとめ
今回はスパムメッセージを題材に、前処理 (Transformer) と分類・回帰 (Estimator) からなる処理をMLパッケージを使って実装し、最後にそれらの処理をPipelineでまとめて一つにしました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム
- 作者: Tomasz Drabas,Denny Lee,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2017/11/22
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (3件) を見る