Python PandasをSQLみたいに使う

PandasでSQLっぽい処理(SELECT、WHERE、JOINなど)をさせてみます。

準備

前回同様、以下で提供されていますPostgreSQLのサンプルデータベースを使います。 こちらの記事を参考にリストアしました。

PostgreSQL Sample Database

前回の記事で紹介した方法で、4種類のテーブルをDataFrameに読み込みます。 いずれも主キーをインデックスにしています。

import psycopg2
import pandas as pd
import datetime
import pandas.tseries.offsets as offsets

# 接続情報
pgconfig = {
    'host': 'localhost',
    'port': '5432',
    'database': 'test',
    'user': 'ohke',
    'password': 'ohke'
}

# 接続
connection = psycopg2.connect(**pgconfig)

# テーブルをDataFrameへロード
rentals = pd.read_sql(sql="SELECT * FROM rental;", con=connection, index_col='rental_id' )
customers = pd.read_sql(sql="SELECT * FROM customer;", con=connection, index_col='customer_id')
inventories = pd.read_sql(sql="SELECT * FROM inventory;", con=connection, index_col='inventory_id')
payments = pd.read_sql("SELECT * FROM payment;", con=connection, index_col='payment_id')

結合(JOIN)

まずはmergeを使ってrentalsとcustomersをJOINさせます。

各DataFrameのcustomer_idでINNER JOINさせますが、右テーブル customersの方はcustomer_idがインデックスとなっているため、right_indexオプションをTrueにする必要があります(TrueにしないとTypeErrorとなります)。
デフォルトではINNER JOINで、それ以外の方法で結合する場合は、howオプションで指定します。
また列名が2つのテーブルで重複している場合、デフォルトでは"x"や"y"などのサフィックスが付きます(suffixesオプションで任意の文字列を指定できます)。

merged_df = pd.merge(rentals, customers, left_on='customer_id', right_index=True)
# pd.merge(rentals, customers, left_on='customer_id')
# →TypeError: object of type 'NoneType' has no len()

DataFrameのクラスメソッドとしても実装されていますので、以下のようにmergeメソッドを連ねることで、rentals→inventories→filmsを1回でジョインできます。

merged_df = rentals.merge(inventories, left_on='inventory_id', right_index=True).merge(films, left_on='film_id', right_index=True)

インデックスでジョインする場合、mergeよりもjoinメソッドを使うほうが手っ取り早いです。
ただし、mergeと異なりデフォルトではLEFT OUTER JOINになること(howオプションで指定可能)、列名が重複する場合はサフィックスの指定が必須となることに注意してください(重複する場合は、ValueErrorとなります)。

joined_df = rentals.join(customers, on='customer_id', rsuffix='_customers') # 列名last_updateが重複しているため、rsuffixでサフィックスを指定する
# rentals.join(customers, on='customer_id')
# → ValueError: columns overlap but no suffix specified: Index(['last_update'], dtype='object')

値の変換(SELECT)

列の値を変換する場合は、mapメソッドを使います。

変換前の値をkey、変換後の値をvalueとするディクショナリを引数に渡すことで、指定の列(Series)の値が変換されます。

# staff_idが1なら"A Staff"、2なら"B Staff"へ変換する
store_names = {
    1: "A Staff",
    2: "B Staff"
}

rentals['staff_id'] = rentals['staff_id'].map(store_names)

行単位の計算と列の追加

DataFrameに列や行の単位で何らかの計算を行う場合、applyを使って各値に関数(またはlambda)を適用します。 以下ではrental_dateとreturn_dateが10日以上離れている場合にTrueとなるarrearsフラグの列をrentalsへ追加しています。

applyのオプションaxisで関数を適用する次元を指定しています。 axis=1とすることで行単位で計算されます(デフォルトは0で、列単位です)。
PandasでSQLを使ってDataFrameをロードすると、Timestamp(ここではrental_date)はpandas.tslib.Timestampへマッピングされます。 この型は同じくPandasが提供するpandas.tseries.offsetsを使って10日先を指定しています。 normalize=Trueとすることで、時刻部分は切り捨てられて00:00:00になります。

# applyのaxis=1を指定することで行毎に関数を適用し、得られたSeriesをrentals['arrears]に追加しています
# pandas.tseries.offsetsでは、normalize=Trueとすることで、時刻部分は切り捨てられて0になります
rentals['arrears'] = rentals.apply(lambda r: r['rental_date'] + offsets.Day(10, normalize=True) <= r['return_date'], axis=1)

条件を満たす行を取得する(WHERE)

SQLのWHERE句のように、条件を満たす行を取得したい場合、bool型のSeriesを使って行をフィルタします。 以下では、先程追加したarrearsフラグがTrueのcustomer_idのSeriesを取得しています。

# arrearsがTrueのcustomerを列挙する
arrears_rentals = rentals[rentals['arrears']]

ユニークな値の一覧を取得する(DISTINCT)

drop_duplicatesメソッドを使うと、DISTINCTのように重複を除外したユニークな値のSeriesが得られます。

arrears_customer_ids = rentals[rentals['arrears']]['customer_id'].drop_duplicates()

Seriesと一致する行を抽出する(IN)

上で取得したarrears_customer_ids(Series)を使って、customersから行を抽出します(WHERE IN(...)ですね)。 こういうケースではisinメソッドが使えます。

customers[customers.index.isin(arrears_customer_ids.values)]

集約(GROUP BYなど)

SQLで言うところのGROUP BYとCOUNTやSUMを組み合わせた集約操作についても、Pandasで同じことができます。 customer_id毎にレンタル回数(rentalsの行数)と合計料金(paymentテーブルのamountの合計)、ランク(25回以上かつ100ドル以上をA、それ以外をB)を計算し、それぞれの値をtotal_count、total_amount、rankの3つの列として追加してます。

それぞれのテーブルでgroupbyメソッドを使ってcustomer_idで分割し、countやsumメソッドでcustomer_id毎に行数や合計数を計算しています。

customers['total_count'] = rentals.groupby('customer_id')['rental_date'].count()

customers['total_amount'] = payments.groupby('customer_id')['amount'].sum() 

customers['rank'] = customers.apply(lambda c: 'A' if c['total_count'] >= 25 and c['total_amount'] else 'B' , axis=1)

集約の応用として、日毎のレンタル数とカスタマID数を計算してみます。

最初に日付をインデックスとした、空のDataFrameを生成します。
groupbyでは関数やlambdaを渡すことができます。 ここでは、rental_dateで時刻部分を切り捨て、日付部分のみで集約しています。
また、sort_valuesを使い任意の値(ここではレンタル回数)で降順ソートしています。

# 2005-05-25〜2005-05-31で1日刻みのインデックスを持つ空のDataFrameの作成
rental_distribution_df = pd.DataFrame(index=pd.date_range('2005-05-25', '2005-05-31', name='date')) 

# 日毎にレンタル回数をカウント
rental_distribution_df['rental_count'] = rentals.groupby(
    lambda i: rentals['rental_date'][i] + offsets.Day(0, normalize=True))['rental_date'].count() # lambdaではrental_dateの時刻を切り捨てている

# 日毎にカスタマIDをカウント
rental_distribution_df['customer_count'] = rentals.groupby(
    lambda i: rentals['rental_date'][i] + offsets.Day(0, normalize=True))['customer_id'].nunique() # nunique()でユニーク数を集計

# レンタル回数で降順ソート
rental_distribution_df.sort_values('rental_count', ascending=False)

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Python PostgreSQLのテーブルをPandasのDataFrameへ読み込む

PostgreSQLのテーブルをPandasのDataFrameに読み込む方法の備忘録です。

今回も以下で提供されているサンプルデータを使っています。
PostgreSQL Sample Database

まずはPandasと、PostgreSQLのドライバとしてpsycopg2をインポートして、DBに接続しています。

import pandas as pd
import psycopg2

# 接続情報
connection_config = {
    'host': 'localhost',
    'port': '5432',
    'database': 'test',
    'user': 'ohke',
    'password': 'ohke'
}

# 接続
connection = psycopg2.connect(**connection_config)

次にDataFrameへロードします。 Pandasのread_sqlメソッドに先程接続したコネクションとSQL(文字列)を渡すことで、SELECT結果をDataFrameとして返してくれます。

引数index_colでインデックスを指定しています。 この引数が無い場合は、1から自動的に採番されます。

# DataFrameでロード
rentals = pd.read_sql(sql="SELECT * FROM rental;", con=connection, index_col='rental_id' )

# 表示
rentals

Jupyterでは以下のように表形式で表示されます。 テーブルのカラム名がそのままDataFrameのカラム名として使われていること、インデックスとしてrental_idが使われていることが確認できます。

f:id:ohke:20170623085109p:plain

普通のDataFrameですので、例えばカラム名でSeriesを取り出すこともできます。

rentals['inventory_id']
rental_id
2        1525
3        1711
4        2452
5        2079
6        2792
7        3995
8        2346
9        2580

時刻はTimestamp型にマッピングされるようです。

rentals['rental_date'][1]
Timestamp('2005-05-24 22:53:30')

また、統計値も取得できます(あまり意味のある値ではありませんが)。

rentals.describe()

f:id:ohke:20170623090607p:plain:w350

C# HttpClientでKeep-Aliveを無効にする

仕事で利用していたクラウドサービスのAPIの仕様で、リクエスト都度で認証する必要があり、ハマったので備忘録にしておきます。

当初は以下のようにHttpClientでAPIをコールしていたのですが、一度認証されるとそのセッションが使いまわされてしまいます。
HttpClientをDisposeすることでもコネクションを破棄することもできるのですが、それはそれでソケットの枯渇という別の問題も生みます。

開発者を苦しめる.NETのHttpClientのバグと紛らわしいドキュメント

// HttpClientインスタンスはstatic変数として使いまわす
private static HttpClient httpClient = new HttpClient()

// リクエスト生成
var request = new HttpRequestMessage
{
    Method = HttpMethod.Post,
    RequestUri = new Uri("http://ohke.hateblo.jp/")
};

// Basic認証ヘッダを付与する
request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue(
    "Basic",
    Convert.ToBase64String(Encoding.ASCII.GetBytes(string.Format("{0}:{1}", userName, userPassword))));

// リクエストの送信
var response = await httpClient.SendAsync(request);

リクエスト都度認証させるためにはKeep-Aliveを無効にして、HTTPのConnectionヘッダをCloseにする必要があります(これに気付かされるのも時間がかかったのですが)。

HttpWebRequestにはKeepAliveプロパティをfalseにすれば良いのですが、HttpClientを使う場合はHttpRequestMessageのHeadersプロパティ(またはHttpClientのDefaultRequestHeaders)に直接書き加える必要があります。

これでリクエストヘッダがConnection: Closeとなり、Keep-Aliveが無効になります。

// Basic認証ヘッダを付与する
request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue(
    "Basic",
    Convert.ToBase64String(Encoding.ASCII.GetBytes(string.Format("{0}:{1}", userName, userPassword))));

// Keep-Aliveをオフにする(リクエスト都度認証させる)
request.Headers.Add("Connection", "close");

// リクエストの送信
var response = await httpClient.SendAsync(request);

通常は一度認証したセッションは使いまわすべきなので、レアケースかとは思います。