け日記

SIerから転職したWebアプリエンジニアが最近のIT技術キャッチアップに四苦八苦するブログ

Python PandasをSQLみたいに使う

PandasでSQLっぽい処理(SELECT、WHERE、JOINなど)をさせてみます。

準備

前回同様、以下で提供されていますPostgreSQLのサンプルデータベースを使います。 こちらの記事を参考にリストアしました。

PostgreSQL Sample Database

前回の記事で紹介した方法で、4種類のテーブルをDataFrameに読み込みます。 いずれも主キーをインデックスにしています。

import psycopg2
import pandas as pd
import datetime
import pandas.tseries.offsets as offsets

# 接続情報
pgconfig = {
    'host': 'localhost',
    'port': '5432',
    'database': 'test',
    'user': 'ohke',
    'password': 'ohke'
}

# テーブルをDataFrameへロード
rentals = pd.read_sql(sql="SELECT * FROM rental;", con=connection, index_col='rental_id' )
customers = pd.read_sql(sql="SELECT * FROM customer;", con=connection, index_col='customer_id')
inventories = pd.read_sql(sql="SELECT * FROM inventory;", con=connection, index_col='inventory_id')
payments = pd.read_sql("SELECT * FROM payment;", con=connection, index_col='payment_id')

結合(JOIN)

まずはmergeを使ってrentalsとcustomersをJOINさせます。

各DataFrameのcustomer_idでINNER JOINさせますが、右テーブル customersの方はcustomer_idがインデックスとなっているため、right_indexオプションをTrueにする必要があります(TrueにしないとTypeErrorとなります)。
デフォルトではINNER JOINで、それ以外の方法で結合する場合は、howオプションで指定します。
また列名が2つのテーブルで重複している場合、デフォルトでは"x"や"y"などのサフィックスが付きます(suffixesオプションで任意の文字列を指定できます)。

merged_df = pd.merge(rentals, customers, left_on='customer_id', right_index=True)
# pd.merge(rentals, customers, left_on='customer_id')
# →TypeError: object of type 'NoneType' has no len()

DataFrameのクラスメソッドとしても実装されていますので、以下のようにmergeメソッドを連ねることで、rentals→inventories→filmsを1回でジョインできます。

merged_df = rentals.merge(inventories, left_on='inventory_id', right_index=True).merge(films, left_on='film_id', right_index=True)

インデックスでジョインする場合、mergeよりもjoinメソッドを使うほうが手っ取り早いです。
ただし、mergeと異なりデフォルトではLEFT OUTER JOINになること(howオプションで指定可能)、列名が重複する場合はサフィックスの指定が必須となることに注意してください(重複する場合は、ValueErrorとなります)。

joined_df = rentals.join(customers, on='customer_id', rsuffix='_customers') # 列名last_updateが重複しているため、rsuffixでサフィックスを指定する
# rentals.join(customers, on='customer_id')
# → ValueError: columns overlap but no suffix specified: Index(['last_update'], dtype='object')

値の変換(SELECT)

列の値を変換する場合は、mapメソッドを使います。

変換前の値をkey、変換後の値をvalueとするディクショナリを引数に渡すことで、指定の列(Series)の値が変換されます。

# staff_idが1なら"A Staff"、2なら"B Staff"へ変換する
store_names = {
    1: "A Staff",
    2: "B Staff"
}

rentals['staff_id'] = rentals['staff_id'].map(store_names)

行単位の計算と列の追加

DataFrameに列や行の単位で何らかの計算を行う場合、applyを使って各値に関数(またはlambda)を適用します。 以下ではrental_dateとreturn_dateが10日以上離れている場合にTrueとなるarrearsフラグの列をrentalsへ追加しています。

applyのオプションaxisで関数を適用する次元を指定しています。 axis=1とすることで行単位で計算されます(デフォルトは0で、列単位です)。
PandasでSQLを使ってDataFrameをロードすると、Timestamp(ここではrental_date)はpandas.tslib.Timestampへマッピングされます。 この型は同じくPandasが提供するpandas.tseries.offsetsを使って10日先を指定しています。 normalize=Trueとすることで、時刻部分は切り捨てられて00:00:00になります。

# applyのaxis=1を指定することで行毎に関数を適用し、得られたSeriesをrentals['arrears]に追加しています
# pandas.tseries.offsetsでは、normalize=Trueとすることで、時刻部分は切り捨てられて0になります
rentals['arrears'] = rentals.apply(lambda r: r['rental_date'] + offsets.Day(10, normalize=True) <= r['return_date'], axis=1)

条件を満たす行を取得する(WHERE)

SQLのWHERE句のように、条件を満たす行を取得したい場合、bool型のSeriesを使って行をフィルタします。 以下では、先程追加したarrearsフラグがTrueのcustomer_idのSeriesを取得しています。

# arrearsがTrueのcustomerを列挙する
arrears_rentals = rentals[rentals['arrears']]

ユニークな値の一覧を取得する(DISTINCT)

drop_duplicatesメソッドを使うと、DISTINCTのように重複を除外したユニークな値のSeriesが得られます。

arrears_customer_ids = rentals[rentals['arrears']]['customer_id'].drop_duplicates()

Seriesと一致する行を抽出する(IN)

上で取得したarrears_customer_ids(Series)を使って、customersから行を抽出します(WHERE IN(…)ですね)。 こういうケースではisinメソッドが使えます。

customers[customers.index.isin(arrears_customer_ids.values)]

集約(GROUP BYなど)

SQLで言うところのGROUP BYとCOUNTやSUMを組み合わせた集約操作についても、Pandasで同じことができます。 customer_id毎にレンタル回数(rentalsの行数)と合計料金(paymentテーブルのamountの合計)、ランク(25回以上かつ100ドル以上をA、それ以外をB)を計算し、それぞれの値をtotal_count、total_amount、rankの3つの列として追加してます。

それぞれのテーブルでgroupbyメソッドを使ってcustomer_idで分割し、countやsumメソッドでcustomer_id毎に行数や合計数を計算しています。

customers['total_count'] = rentals.groupby('customer_id')['rental_date'].count()

customers['total_amount'] = payments.groupby('customer_id')['amount'].sum() 

customers['rank'] = customers.apply(lambda c: 'A' if c['total_count'] >= 25 and c['total_amount'] else 'B' , axis=1)

集約の応用として、日毎のレンタル数とカスタマID数を計算してみます。

最初に日付をインデックスとした、空のDataFrameを生成します。
groupbyでは関数やlambdaを渡すことができます。 ここでは、rental_dateで時刻部分を切り捨て、日付部分のみで集約しています。
また、sort_valuesを使い任意の値(ここではレンタル回数)で降順ソートしています。

# 2005-05-25〜2005-05-31で1日刻みのインデックスを持つ空のDataFrameの作成
rental_distribution_df = pd.DataFrame(index=pd.date_range('2005-05-25', '2005-05-31', name='date')) 

# 日毎にレンタル回数をカウント
rental_distribution_df['rental_count'] = rentals.groupby(
    lambda i: rentals['rental_date'][i] + offsets.Day(0, normalize=True))['rental_date'].count() # lambdaではrental_dateの時刻を切り捨てている

# 日毎にカスタマIDをカウント
rental_distribution_df['customer_count'] = rentals.groupby(
    lambda i: rentals['rental_date'][i] + offsets.Day(0, normalize=True))['customer_id'].nunique() # nunique()でユニーク数を集計

# レンタル回数で降順ソート
rental_distribution_df.sort_values('rental_count', ascending=False)

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理