小ネタ: PandasでCSV文字列を分割して列にする

PandasでCSV形式の文字列のカラムを、それをカンマ区切りで分割して、別々の列にする方法のメモです。
例えば、1行目なら"123"と"456"と"789"を3つの列に分割します。

import pandas as pd

df = pd.DataFrame({'name': ['A', 'B'], 'csv': ['123,456,789', 'abc,def,ghi']})

Series.str.splitメソッドを使うと、3つの列に分割されたDataFrame (tmp) が生成されます。最後にもとのDataFrame (df) に3つの列を追加しています。

  • 第1引数は区切り文字
  • expand引数は列に展開するかどうか (Falseの場合、列は分割されずリストになる)
# カンマ区切りで分割
tmp = df['csv'].str.split(',', expand=True)

# 列を追加
df['value1'] = tmp[0]
df['value2'] = tmp[1]
df['value3'] = tmp[2]

Flask-CachingでRedisにキャッシュする

Flaskアプリケーションでビューを楽にキャッシュする方法はないかと探していた時、同僚にFlask-Cachingを紹介されました。
Flask-Cachingを使ってRedisにキャッシュする方法について整理します。

Flask-Caching

Flask-Cachingは以下の特徴があり、Flaskアプリケーションに容易に導入できます (公式ドキュメント) 。

  • デコレータで簡単にキャッシュの設定・定義ができる
  • Redis、memcached、ファイルなど、複数の種類のキャッシュミドルウェアを同じインタフェースで利用できる

github.com

Redisの構築 (準備)

キャッシュ先として利用するRedis環境をDockerで構築しておきます。

>  docker run -d -p 6379:6379 redis --requirepass pass1234

サンプルアプリケーションの実装 (準備)

必要なパッケージをインストールしておきます。

> pip install flask==1.0.2 Flask-Caching==1.4.0 redis==2.10.6

今回題材とするFlaskアプリケーションの実装は以下です。

  • /<key>/<value>にPOSTリクエストすると、dictオブジェクトに保存されます (ここではグローバル変数DICTIONARY、実際のサービスではDBなどが使われるでしょう)
  • /<key>にGETリクエストすると、永続化層からkeyの値を取得して返します

FLASK_APP=run.py ./venv/bin/flask runで起動します。

from flask import Flask

DICTIONARY = {}  # DBでもなんでも良いです

app = Flask(__name__)


@app.route('/<key>')
def get_value(key):
    print(f'get_value({key})')
    return f'key={key}, value={DICTIONARY[key]}'


@app.route('/<key>/<value>', methods=['POST'])
def set_value(key, value):
    print(f'set_value({key},{value})')
    DICTIONARY[key] = value
    return 'OK'

Flask-Cachingの導入

それではFlask-Cachingをアプリケーションへ導入します。目的は/<key>へのGETアクセスのレスポンスをキャッシュすることです。ポイントは2点です。

  • Cacheインスタンスを作成します
    • 上で構築したRedisサーバの設定をconfigに渡し、Flaskインスタンス (app) と紐付けます
    • 設定項目の一覧は、こちら
  • キャッシュしたいハンドラをデコレータ (@cache.cached()) で指定します
from flask import Flask
from flask_caching import Cache

DICTIONARY = {}  # DBでもなんでも良いです

app = Flask(__name__)

# Cacheインスタンスの作成
cache = Cache(app, config={
    'CACHE_TYPE': 'redis',
    'CACHE_DEFAULT_TIMEOUT': 60,
    'CACHE_REDIS_HOST': 'localhost',
    'CACHE_REDIS_PORT': 6379,
    'CACHE_REDIS_PASSWORD': 'pass1234',
    'CACHE_REDIS_DB': '0'
})


@app.route('/<key>')
@cache.cached(timeout=30)  # 30秒間レスポンスをキャッシュする
def get_value(key):
    print(f'get_value({key})')
    return f'key={key}, value={DICTIONARY[key]}'

# 以下略

起動して、/key1/1にPOSTしてデータを作り、/key1にGETすると値1が取得できます。このときキャッシュが行われます。
最初のアクセスではprintで出力されますが、2回目以降のアクセスでは出力されません。30秒 (timeoutの設定値) が経過後に、GETリクエストするとキャッシュは捨てられるのでDICTIONARYから取得してビューが生成されます (print文も出力されます) 。

GET後にRedisに接続して中を見ると、キーflask_cache_view//key1に対して、値!\x80\x03X\x11\x00\x00\x00key=key1, value=1q\x00.が作られていることがわかります。
キーは、デフォルトではflask_cache_view/ + パスとなっています。この内、flask_cache_はCacheインスタンス作成時の設定項目の一つ、CACHE_KEY_PREFIXで変更できます。

127.0.0.1:6379> KEYS *
1) "flask_cache_view//key1"
127.0.0.1:6379> GET flask_cache_view//key1
"!\x80\x03X\x11\x00\x00\x00key=key1, value=1q\x00."

cachedデコレータにはいくつかパラメータがあります。

  • timeout: キャッシュの有効期間 (秒)
  • key_prefix: その名の通りキーのプリフィックスで、デフォルトでは"view/リクエストパス"
  • query_string: Trueの場合、クエリパラメータが異なるリクエストは、クエリパラメータを含むハッシュ値をキーとしてキャッシュされます (デフォルトはFalseで、クエリパラメータが異なっても同一のキーが使われる)

キャッシュの削除

次はキャッシュを削除することを考えます。
上のままでは、GETしてキャッシュをした後30秒間は常にキャッシュの値が返されます。30秒以内にPOSTして値が更新されたとしても、30秒が経過するまで更新後の値を取得できません (古い値が返されます) 。

キャッシュの削除はCacheオブジェクトのdeleteメソッドで行います。POSTでキャッシュを削除するように変更したset_valueメソッドを示します。

Redisのキーは flask_cache_view//+key という形式となっています。flask_cache_ (CACHE_KEY_PREFIX) の部分はdeleteメソッド内で補完されますので、cache.delete(f'view//{key}')とすることでGETで設定されたキーを削除できます。

@app.route('/<key>/<value>', methods=['POST'])
def set_value(key, value):
    print(f'set_value({key},{value})')
    DICTIONARY[key] = value

    cache.delete(f'view//{key}')

    return 'OK'

まとめ

FlaskでレスポンスをRedisにキャッシュする簡単な方法として、Flask-Cachingを紹介しました。

PySparkでMLを使って機械学習する

引き続きPySparkについてです。今回はMLパッケージを使ってスパムメッセージを分類します。

ML

MLパッケージ (pyspark.ml) は機械学習用のパッケージです。

様々な前処理 (Transformer) や 分類・回帰 (Estimator) などを行うクラスが提供されています (公式ドキュメント) 。

機械学習のタスクは多くの場合、上の処理を組み合わせて行います。例えば、スパムメッセージを分類するというタスクならば、メッセージ文のトークン化 → ストップワードの除去 → TF-IDFでベクトル化 → ナイーブベイズで分類、といった多段の処理を行う必要があります。そしてそれらを学習データとテストデータのそれぞれで行います。
Sparkでそうした多段処理を簡単に定義できるのがPipelineです (公式ドキュメント) 。イメージはscikit-learnのPipelineに近いです。

スパムメッセージの分類

スパムメッセージの分類タスクをPipelineを使って実装していきます。

環境は以下で構築したものを使います。

ohke.hateblo.jp

以下のクラスライブラリをインポートしておきます。

import pandas as pd
import urllib.request
import zipfile

from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator

データセットのロード

最初にデータセットをダウンロードして、PySparkのDataFrameへ展開します。

データセットはUCIのMachine Learning Repositoryから入手します (参照) 。

# データセットをダウンロードして解凍
# https://archive.ics.uci.edu/ml/datasets/sms+spam+collection
urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip', 'smsspamcollection.zip')
with zipfile.ZipFile('./smsspamcollection.zip') as zip_file:
    zip_file.extractall('.')  # ./SMSSpamCollection

PySparkのセッションを開いて、DataFrameにロードします。

  • labelにスパムなら"spam"、そうでないなら"ham"が入っています (今回のタスクで予測する変数です)
  • bodyにメッセージ本文が入っています
  • スパムメッセージが747通、スパムでないメッセージが4827通
# セッションを開く
try:
    spark_session.stop()
except NameError:
    pass

spark_session = SparkSession.builder.appName(
    name='spark-notebook',
).master(
    master='local[*]',
).enableHiveSupport().getOrCreate()

# DataFrameにロード
schema = StructType([
    StructField("label", StringType(), True),
    StructField('body', StringType(), True)
])

messages = spark_session.read.csv('./SMSSpamCollection', schema=schema, header=None, sep='\t')

print(messages.take(3))
# [Row(label='ham', body='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'), 
#  Row(label='ham', body='Ok lar... Joking wif u oni...'), 
#  Row(label='spam', body="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]

print(messages.groupBy('label').count().collect())
# [Row(label='ham', count=4827), Row(label='spam', count=747)]

MLパッケージを使った機械学習

5段階の処理を定義します。それぞれカッコ内のクラスを使って行います。なお、TF-IDFは2つの処理を組み合わせる必要があります。

ラベルを数値化

各処理を追っていく前に、文字列となっているラベルを数値に置き換えます。"ham"なら0、"spam"なら1にします。

  • DataFrameのwithColumnメソッドで、カラムの値を追加したり置き換えたりできます
  • withColumnの第2引数に渡しているのはUserDefinedFunctionで、ここではlabelを0 or 1の整数に置き換える関数 (replace_label_num) を定義してます
def replace_label_num(label):
    return int(1 if label == 'spam' else 0)

messages = messages.withColumn(
    'label', 
    UserDefinedFunction(replace_label_num, IntegerType())('label')
)

print(messages.head(3))
# [Row(label='ham', body='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'), 
#  Row(label='ham', body='Ok lar... Joking wif u oni...'), 
#  Row(label='spam', body="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")]

学習データとテストデータの分離

加えて、学習用のデータとテスト用のデータを分離します。

DataFrameにはrandomSplitというメソッドが定義されており、第1引数で指定された比率 (ここでは1:1) でデータをランダムに分割してくれます。

messages_train, messages_test = messages.randomSplit([0.5, 0.5], seed=123)

トークン化

各処理を追っていきます。最初にトークン化から見ていきます。トークン化では、1つの文書から単語を切り出していきます。

RegexTokenizerを使って、引数patternの正規表現にマッチした場合に、1単語として切り出します。また、各単語は小文字に統一されます。
inputColは文書のカラム (body) 、outputColには分割したあとの単語リストを持つカラム (words) を指定します。データに対して前処理を行うTransformerは、inputColのデータを変換してoutputColに出力する、という共通のインタフェースを持ちます。

変換する際は、transformメソッドを使います。これも共通のインタフェースです。

出力結果を見ると、分割されて小文字に統一された単語リストがwordsに入っていることがわかります。

tokenizer = RegexTokenizer(
    inputCol='body',
    outputCol='words',
    pattern='\s+|[,.]'
)

tmp = tokenizer.transform(messages_train)
tmp.head(1)
# [Row(label=0, body=' came to look at the flat, seems ok, in his 50s? * Is away alot wiv work. Got woman coming at 6.30 too.', words=['came', 'to', 'look', 'at', 'the', 'flat', 'seems', 'ok', 'in', 'his', '50s?', '*', 'is', 'away', 'alot', 'wiv', 'work', 'got', 'woman', 'coming', 'at', '6', '30', 'too'])]

ストップワードの除去

ストップワードは、代名詞や前置詞など、どの文章でも高い頻度で出現するため文章の特徴を表されない語のことです。ノイズになるので、前処理の段階でこういった語は省いてしまいます。

StopWordsRemoverを使うと、こうした語を削除してくれます。
inputColには、tokenizerで出力したカラムをgetOutputColメソッドを使って指定しています。

学習データを使った出力を見ると、"at"や"is"が取り除かれていることがわかります。

# ストップワードの除去
stopwrods_remover = StopWordsRemover(
    inputCol=tokenizer.getOutputCol(),
    outputCol='meaningful_words'
)

tmp = stopwrods_remover.transform(tmp)
print(tmp.select('meaningful_words').head(1))
# [Row(meaningful_words=['came', 'look', 'flat', 'seems', 'ok', '50s?', '*', 'away', 'alot', 'wiv', 'work', 'got', 'woman', 'coming', '6', '30'])]

TF-IDFの計算

TF-IDFでは、まずCountVectorizerで各文書内での単語の出現頻度 (TF) をカウントし、次にIDFで文書間の語の出現頻度の逆数 (IDF) を計算します。

tf_featuresを見ると、全文書の語の種類が1711個存在し、インデックスされた語の出現頻度がdictで入っています。
tfidf_featuresは、tf_featuresを使ってTF-IDFが計算された結果が、やはりdictで入っています (メソッド名がIDFなので、IDFだけ別で計算されそうですが、ちゃんとTF-IDFが計算されます) 。

# TF
count_vectorizer = CountVectorizer(
    inputCol=stopwrods_remover.getOutputCol(), 
    outputCol='tf_features',
    minDF=3.0
)

tmp = count_vectorizer.fit(tmp).transform(tmp)
print(tmp.select('tf_features').head(1))
# [Row(tf_features=SparseVector(1711, {6: 1.0, 14: 1.0, 70: 1.0, 164: 1.0, 195: 1.0, 216: 1.0, 302: 1.0, 310: 1.0, 357: 1.0, 709: 1.0, 758: 1.0, 1089: 1.0, 1218: 1.0}))]

# TF-IDF
idf = IDF(
    inputCol=count_vectorizer.getOutputCol(),
    outputCol='tfidf_features'
)

tmp = idf.fit(tmp).transform(tmp)
print(tmp.select('tfidf_features').head(1))
# [Row(tfidf_features=SparseVector(1711, {6: 2.9551, 14: 3.1817, 70: 4.1567, 164: 4.7398, 195: 4.8269, 216: 5.0275, 302: 5.1453, 310: 5.1453, 357: 5.2788, 709: 5.8385, 758: 6.1261, 1089: 6.3085, 1218: 6.3085}))]

ナイーブベイズで学習

最後に、こうして前処理されたデータに対して、ナイーブベイズモデル (NaiveBayes) で学習を行います。

featuresColにTF-IDFのベクトル、labelColにラベル名を指定してモデルを作成して、モデルを作成します。
分類・回帰モデル (Estimator) はfitメソッドとtransformメソッドを持ってます。fitメソッドで学習し、transformで分類・回帰します (このあたりもscikit-learnと同じですね) 。

1行目のtrain_resultは、予測ラベル (prediction) と正解ラベル (label) が一致しています。

naive_bayes = NaiveBayes(
    featuresCol='tfidf_features',
    labelCol='label'
)

model = naive_bayes.fit(tmp)
train_result = model.transform(tmp)

print(train_result.select(['label', 'prediction']).head(1))
# [Row(label=0, prediction=0.0)]

こうして得られたモデルを使い、まず学習データで予測精度を計測します。

2値分類ですので、BinaryClassificationEvaluatorクラスを使います。rawPredictionColに予測ラベル、labelColに正解ラベルを指定します。

transformで得られた結果 (train_result) をAUCで評価すると、0.978となりました。

evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='prediction',
    labelCol='label'
)

print('AUC:', evaluator.evaluate(train_result, {evaluator.metricName: 'areaUnderROC'}))
# AUC: 0.9780369903320724

Pipelineで統合

最後に上で実装した処理をPipelineで1つに連結します。

引数stagesに、上で実装したTransformerとEstimatorをリストにして渡すだけです。あとは学習データを使ってfit、テストデータをtransformすると分類までやってくれます。

pipeline = Pipeline(stages=[
    tokenizer,
    stopwrods_remover,
    count_vectorizer,
    idf,
    naive_bayes
])

# 学習
pipeline_transformer = pipeline.fit(messages_train)

# テスト
test_result = pipeline_transformer.transform(messages_test)
print('AUC:', evaluator.evaluate(test_result, {evaluator.metricName: 'areaUnderROC'}))
# AUC: 0.9442337630590295

まとめ

今回はスパムメッセージを題材に、前処理 (Transformer) と分類・回帰 (Estimator) からなる処理をMLパッケージを使って実装し、最後にそれらの処理をPipelineでまとめて一つにしました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム