panderaでDataFrameをバリデーションする

pandasのDataFrameは柔軟なテーブル構造を提供してくれますが、時に柔軟すぎて困ることもしばしばです。本番運用するアプリケーションですとなおさらこの欠点が目立ちます。

  • 入力データに依存して意図しない型に変わってしまったり...
    • ex. [1, 2, 3]だとint64、[1, None, 3]だとfloat64で解釈される
  • そもそも意図しないフォーマットの入力データが問題なく入ってしまったり...

DataFrameのバリデーションを行うライブラリはいくつかありますが、今回は pandera を紹介します。仮説検定を行う機能 (Hypothesis) も備えていますが、用途が限定的ですので、単純な値バリデーション (Check) のみに説明を絞りたいと思います。

% python --version
Python 3.8.5

% pip list | grep pandera
pandera           0.4.4

アクセスログを集計して得られたセッションログ (session_df) を例とし、これに対してバリデーションしてみます。

import pandas as pd

# セッションID (id) がインデックス
session_df = pd.DataFrame(
    {
        # ログインしている場合は3桁, していない場合はNone
        "login_id": ["U10", "I22", None, "U05"],
        # PC, SD, APのいずれか
        "device": ["SD", "PC", "SD", "AP"],
        # 9/11ランディングのみ
        "landing_time": pd.to_datetime(
            [
                "2020-09-11T00:00:00",
                "2020-09-11T00:00:12",
                "2020-09-11T00:01:07",
                "2020-09-11T00:01:30",
            ]
        ),
        # パスは"/"開始
        "landing_path": ["/", "/page/hoge", "/", "/pages"],
        # 滞在時間は0秒以上
        "duration_secs": [121, 63, 0, 90],
    },
    index=pd.Index([1001, 1002, 1003, 1004], name="id"),
)

print(session_df)
#      login_id device        landing_time landing_path  duration_secs
# id
# 1001      U10     SD 2020-09-11 00:00:00            /            121
# 1002      I22     PC 2020-09-11 00:00:12   /page/hoge             63
# 1003     None     SD 2020-09-11 00:01:07            /              0
# 1004      U05     AP 2020-09-11 00:01:30       /pages             90

panderaでのバリデーションは2ステップです。

  • ステップ1 DataFrameSchemaでインデックスやカラムごとのルールを定義
    • checksに1つ以上 (複数の場合はlist) のルール (= _CheckBaseのサブクラス) をセット
    • pandera.Check以下にstr_lengthやisin、rangeなど様々なルールが定義されています
    • スキーマはyamlでも定義できます
  • ステップ2 validateメソッドでバリデーション
    • 成功すると、入力したDataFrameが返されます
    • 失敗すると、SchemaErrorがraiseされます
import pandera as pa

# ステップ1
session_df_schema = pa.DataFrameSchema(
    index=pa.Index(pa.Int, name="id", allow_duplicates=False),
    columns={
        "login_id": pa.Column(
            pa.String,
            nullable=True,
            checks=pa.Check.str_length(min_value=3, max_value=3),
        ),
        "device": pa.Column(pa.String, checks=pa.Check.isin(["PC", "SD", "AP"])),
        "landing_time": pa.Column(
            pa.DateTime,
            checks=pa.Check.in_range(
                min_value=pd.to_datetime("2020-09-11T00:00:00"),
                max_value=pd.to_datetime("2020-09-12T00:00:00"),
                include_max=False,
            ),
        ),
        "landing_path": pa.Column(pa.String, checks=pa.Check.str_startswith("/")),
        "duration_secs": pa.Column(
            pa.Int, checks=pa.Check.greater_than_or_equal_to(0)
        ),
    },
)

# ステップ2
session_df_schema.validate(session_df)

誤った値 ("U100"で4桁) が入っているとvalidateでSchemaErrorが投げられますが、エラーメッセージに誤ったインデックス・カラム・ルールを含みますので、その後の調査や修正もしやすいです。

session_df = pd.DataFrame(
    {
        "login_id": ["U100", "I22", None, "U05"],
    # ...

session_df_schema.validate(session_df)
# ...
# pandera.errors.SchemaError: <Schema Column: 'login_id' type=string> failed element-wise validator 0:
# <Check _str_length: str_length(3, 3)>
# failure cases:
#    index failure_case
# 0   1001         U100

CheckにSeriesを受け取る関数を渡すことで、ルールを自分で定義することもできます。landing_pathのチェックを自前で実装している例を示します。

session_df_schema = pa.DataFrameSchema(
        ...,
        # "landing_path": pa.Column(pa.String, checks=pa.Check.str_startswith("/")),
        "landing_path": pa.Column(
            pa.String, checks=pa.Check(lambda s: s.str.startswith("/"))
        ),
        ...
    },
)