PySpark (+Jupyter Notebook) でDataFrameを扱う

前回の投稿では、PySparkをJupyter Notebookから操作するための環境を作りました。

ohke.hateblo.jp

今回は上の環境を使って、PySparkでDataFrameを扱う方法についてまとめます。
(そのため上の環境構築が済んでいる前提となります。)

SparkのDataFrame

Sparkで、分散させるデータを扱うためのAPIが3種類あります。

  1. RDD
  2. DataFrame
  3. DataSet

このうち、PySparkでできるのはRDD or DataFrameとなります (DataSet APIは型安全性を確保するのがメインテーマであるため、Scala/Javaのみサポートされてます) 。

ではどちらを使うべきかですが、DataFrame APIを使うのが良いそうです。
というのも、DataFrame APIはCatalyst Optimizerがラップされており、これがクエリの実行計画を最適化するため、Scala/Javaと比較しても遜色ない程度のパフォーマンスが得られるためです (RDDではpy4jによって発生するPythonとJVMの間のコンテキストスイッチに対して最適化されていないので、Scala/Javaと比較して格段に遅くなりやすかったのです) 。

DataFrameの作成

最初にpysparkをインストールしておきます。

$ pip install pyspark

今回はUCIから提供されている肺がんデータセットをダウンロードして使います。

http://mlr.cs.umass.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Original%29

ダウンロードしたCSVファイルをDataFrameを作りますが、その前にSparkSessionでセッションを作成します。
インタラクティブシェルであれば起動時に自動的にセッションが作成あれ、sparkオブジェクトを介してアクセスできます。インタラクティブシェルを使わない場合 (Jupyter Notebookなど) は、以下のように自前でセッションを確立します。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import urllib.request

# 肺がんデータセットをダウンロード
urllib.request.urlretrieve('http://mlr.cs.umass.edu/ml/machine-learning-databases/breast-cancer-wisconsin/breast-cancer-wisconsin.data', 'breast-cancer-wisconsin.data')

# セッションの作成
spark_session = SparkSession.builder.getOrCreate()
# セッションを切るときは spark_session.stop()

上で確立したSparkSessionを使って、DataFrameを作成します。

  • スキーマはStructTypeで定義します
  • CSVファイルから作成しますので、DataFrameReaderクラスのcsvメソッドにファイルパス (とスキーマ定義) を渡してます
# CSVファイルのスキーマを定義
data_schema = StructType([
    StructField("id", StringType(), False),
    StructField('clump_thickness', LongType(), True),
    StructField('uniformity_of_cell_size', LongType(), True),
    StructField('uniformity_of_cell_shape', LongType(), True),
    StructField('marginal_adhesion', LongType(), True),
    StructField('single_epithelial_cell_size', LongType(), True),
    StructField('bare_nuclei', LongType(), True),
    StructField('bland_chromatin', LongType(), True),
    StructField('normal_nucleoli', LongType(), True),
    StructField('mitoses', LongType(), True),
    StructField('classification', LongType(), True),
])

# DataFrameの作成
data = spark_session.read.csv('breast-cancer-wisconsin.data', schema=data_schema)
data.show(1)

#+-------+---------------+-----------------------+------------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+--------------+
|     #id|clump_thickness|uniformity_of_cell_size|uniformity_of_cell_shape|marginal_adhesion|single_epithelial_cell_size|bare_nuclei|bland_chromatin|normal_nucleoli|mitoses|classification|
#+-------+---------------+-----------------------+------------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+--------------+
#|1000025|              5|                      1|                       1|                1|                          2|          1|              3|              1|      1|             2|
#+-------+---------------+-----------------------+------------------------+-----------------+---------------------------+-----------+---------------+---------------+-------+--------------+
#only showing top 1 row

DataFrameへのクエリ

DataFrameへのクエリは、DataFrameオブジェクトのAPIを使う方法と、SQLで記述する方法の2種類があります。

DataFrameオブジェクトのAPI

作成したDataFrameオブジェクトのAPIを使ってクエリする例は以下です。

メソッドチェーンで選択、射影、結合を記述します。それぞれfilterselectjoinのメソッドで行われます。
アクション (showcounttakecollectなど) を実行するまでは遅延評価されます。C#のLINQなどにイメージは近いです。

# id列だけ5件を表示する
data.select('id').show(5)
# +-------+
# |     id|
# +-------+
# |1000025|
# |1002945|
# |1015425|
# |1016277|
# |1017023|
# +-------+

# classificationが4の行数を返す
data.filter("classification == 4").count() # 241

# idの末尾が0で、かつ、classificationが4の行を3行取得
data.select('id', 'classification').filter("id like '%0' and classification == 4").take(3)
# [Row(id='1047630', classification=4),
#  Row(id='1050670', classification=4),
#  Row(id='1054590', classification=4)]

# classificationが2ならば"benign", 4ならば"malignant"とするJSONファイルを作成
with open('label.json', mode='w') as f:
    f.write("""
    [
        { "classification": 2, "label": "benign" },
        { "classification": 4, "label": "malignant" }
    ]
    """)

# JSONファイルからDataFrameを作成
label = spark_session.read.json('label.json', multiLine=True)

# dataとjoinする
data.join(label, data.classification == label.classification).select('id', 'label').take(3)
# [Row(id='1000025', label='benign'),
#  Row(id='1002945', label='benign'),
#  Row(id='1015425', label='benign')]

SQLを使ったクエリ

セッションを使って、SQLで記述することもできます。

SQLの場合、DataFrameから一時テーブルを作成する必要があります。ここでは、createOrReplaceTempViewメソッドで一時テーブル化してます。
SQLでは、作成時に渡された名前でテーブルを参照します。

# 一時テーブルを作成
data.createOrReplaceTempView('cancer')
label.createOrReplaceTempView('label')

spark_session.sql("""
select id, label
from cancer 
    inner join label
        on cancer.classification = label.classification
where id like '%0'
""").take(3)
# [Row(id='1047630', label='malignant'),
#  Row(id='1050670', label='malignant'),
#  Row(id='1054590', label='malignant'),
#  Row(id='1071760', label='benign'),
#  Row(id='1074610', label='benign')]

まとめ

今回はPySparkでDataFrameを扱う方法について整理しました。最初にセッションを確立し、DataFrameのAPI or SQLで操作できることを確認しました。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

PySpark + Jupyter Notebookの環境をDockerで構築する

お仕事でSparkをJupyter Notebook (Python 3) から使うかもしれないということで、PySparkの実験用環境をDockerで構築する方法について調べました。

今回は、PySpark APIを使ったJupyter Notebookの動作確認をローカルで完結できるようにすることをゴールにします (別ノードで計算させようとすると、sparkmagicやlivyの導入・設定が必要となるようです) 。

最初にpyspark-notebookイメージをプルして起動します。このイメージは、Jupyter Notebookのイメージ (jupyter/scipy-notebook) をベースに、Sparkのダウンロードや環境変数の設定 (SPARK_HOMESPARK_OPTSなど) が行われています。

https://hub.docker.com/r/jupyter/pyspark-notebook/

$ docker pull jupyter/pyspark-notebook

$ docker images jupyter/pyspark-notebook
REPOSITORY                 TAG                 IMAGE ID            CREATED             SIZE
jupyter/pyspark-notebook   latest              50a39e5a0bf6        3 days ago          5.45GB

$ docker run -p 8890:8888 -v /tmp/pyspark-notebook:/home/jovyan/work jupyter/pyspark-notebook start-notebook.sh

次に、Webブラウザを開いて、起動したJupyter Notebookへアクセスします (上の場合ですと、"http://localhost:8890"です) 。トークンが聞かれますので、起動時のコンソールで表示されるトークンを入力しましょう (最初からトークン付きURL "ttp://localhost:8890/?token=xxxxxxxx" でアクセスしても良いです) 。

Jupyter Notebook上でTerminalを開き、pysparkのシェルが起動できることを確認します。また"sc"でSparkContextインスタンスにもアクセスできることも確認できます。

では、Jupyter NotebookでPython 3のファイルを新規作成し、手始めにSparkContextを表示できることを確認します。

あとはPySparkのAPIで計算できるようになります (公式ドキュメントはこちら) 。

data = [1, 2, 3, 4, 5]

rdd = sc.parallelize(data)

rdd.sum()  # 15

Pandasで正規表現を使って検索する

Pandasで正規表現を使って、マッチする行を検索する方法です。

textという列を1つ持つDataFrameをサンプルデータとして作成します。

import pandas as pd
import re

df = pd.DataFrame({'text': ['a', 'aaa', 'abc', 'def']})

f:id:ohke:20180818151533p:plain

PandasのSeriesは、文字列の便利な操作をまとめたstrアクセサを提供しています。

https://pandas.pydata.org/pandas-docs/stable/text.html

例えば、多言語の文字列型などでもよくあるcontains()やstartswith()などが使えます。

# "a"を含む
df[df['text'].str.contains('a')]
#   text
# 0    a
# 1  aaa
# 2  abc

# "ab"で始まる
df[df['text'].str.startswith('ab')]
#   text
# 2  abc

正規表現で検索する場合は、match()を使います。例えば、"a.+"にマッチする行を選択する場合は、以下のように行います。

df[df['text'].str.match(r'a.+')]
#   text
# 1  aaa
# 2  abc

# コンパイル済みのパターンを渡すこともできます
pattern = re.compile('a.+')
df[df['text'].str.match(pattern)]
#   text
# 1  aaa
# 2  abc