SQSを永遠にポーリングするPythonパッケージ sqs-polling を作りました

タイトルの通りで、SQSを永遠にポーリングするPythonパッケージを作ってPyPiに公開しました。

pypi.org

GitHubはこちら。

github.com

使い方

pip install sqs-pollingでインストール。あとは以下のように記述すると、延々とSQSからメッセージを取り出し、コールバックを実行してくれます。最小コードでSQSのデーモンを実現できることが最大の売りです。

from sqs_polling import sqs_polling

def your_callback(message, greeting):
    print(greeting + message["name"])
    return True

your_queue_url="https://sqs.your-region.amazonaws.com/XXXXXXXXXXXX/your-sqs",
sqs_polling(queue_url=your_queue_url, callback=your_callback, callback_args={"greeting": "Hello, "})

その他のお便利ポイントとしては3点あります。

  • SQSメッセージの本文 $.Messages.Body を勝手に取り出してコールバックの引数にしてくれる (オプションでメッセージ全体も取り出し可能)
  • コールバックの返り値をTrueとするだけでSQSからメッセージを削除してくれます (逆にFalseとするとメッセージが残せますので再処理なども自由にコントロールできます)
  • マルチスレッド・マルチプロセスにも対応

モチベーション

AWSにおいて、コンポネント間の通信を疎結合にし、かつ、再処理が容易なアーキテクチャを手軽に実現するために、SQSを採用しているシステムが多いかと思います。

SQSを受信する側は、SQSからメッセージを取り出し -> 何らかの加工や更新を行い -> 正常に終わったらメッセージを削除する を繰り返すデーモン的な処理を実装することになります。
これをwhile True: + time.sleep(1) で実装するというのがあるあるパターンなのですが、ちょっと渋いコードになりやすいです。

また、素のboto3を使って実装すると、定型の処理が多く、ちょっとめんどくさいです。
本文を取り出すためにメッセージのJSONを深くまでパースしたり、正しく処理されたメッセージをSQSから削除するためにハンドラを指定してdeleteするなど、SQS流のお作法を押さえておく必要があります。

デーモン処理とSQS特有のお作法を抽象化・隠蔽するための、薄いフレームワークとしてsqs-pollingを作りました。SQSをサクッと使ってみたい、というときに検討してみてください。

論文メモ: MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications

今回は軽量・高速なCNNの紹介です。

最新のいかついモデルを、GPUフル回転で学習し、目標の精度を達成したとしても、いざ実際にシステムに組み込む段階になると推論の遅さが足かせになって導入できない、というケースはしばしばあるのかなと思います。そういった意味で、軽量・高速なモデルというのは重要です。

  • Kaggleなども推論時間がシビアに制限されているコンペがあるそうです

CNNの軽量化・高速化のブレイクスルーの1つとなったMobileNetの提案論文 MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications (arXiv) を読みましたので、そのメモとなります。

@article{DBLP:journals/corr/HowardZCKWWAA17,
  author    = {Andrew G. Howard and
               Menglong Zhu and
               Bo Chen and
               Dmitry Kalenichenko and
               Weijun Wang and
               Tobias Weyand and
               Marco Andreetto and
               Hartwig Adam},
  title     = {MobileNets: Efficient Convolutional Neural Networks for Mobile Vision
               Applications},
  journal   = {CoRR},
  volume    = {abs/1704.04861},
  year      = {2017},
  url       = {http://arxiv.org/abs/1704.04861},
  archivePrefix = {arXiv},
  eprint    = {1704.04861},
  timestamp = {Mon, 13 Aug 2018 16:46:35 +0200},
  biburl    = {https://dblp.org/rec/bib/journals/corr/HowardZCKWWAA17},
  bibsource = {dblp computer science bibliography, https://dblp.org}
}

MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications

イシュー

AlexNet以降、"より深く"・"より複雑"なネットワーク (CNN) を構築することで精度を改善してきましたが、アプリケーションへの適用にあたってはリアルタイムな推論を限られた計算資源で行う必要があります。

  • 特にロボティックスやスマートドライビングなどの分野

本論文で提案されたMobileNetでは畳み込み層の内積回数とパラメータ数を減らすことで、推論速度の改善を実現してます。

  • モデルの圧縮ではなく、新たな畳み込み層アーキテクチャの提案です

アイディア

コアとなるアイディアは2つあります。

  • Depthwise separable convolutionsで計算量を削減 (主)
  • 精度と速度のトレードオフを調整するハイパパラメータの導入 (副)

Depthwise separable convolutions

従来の畳み込み層 (1層分) の計算量は  K \times K \times M \times N \times F \times F です。全ての要素が乗算されているため、大きな計算量となります。

  •  K \times K : フィルタ (カーネル) サイズ
  •  M : 入力チャネルサイズ,  N : 出力チャネルサイズ
  •  F \times F : 特徴マップサイズ (= 出力層のサイズ)
    • VGG16以降は入力層と同じサイズが一般的 (ex. 32x32 -> 32x32)

この1ステップの工程を2ステップ (depthwise -> pointwise) に分割した畳み込みを行うように変更します。

  • depthwise:  K \times K \times 1 のフィルタM個 => 出力サイズは  F \times F \times M
  • pointwise:  1 \times 1 \times M のフィルタN個 => 出力サイズは  F \times F \times N
    • 1x1畳み込みフィルタで、フィルタ数をコントロールしてます

f:id:ohke:20191102151507p:plain
本文FIg.2抜粋

この分割 (factorizeといいます) によって、計算量は  K \times K \times M \times F \times F + M \times N \times F \times F となります。第1項がdepthwise、第2項がpointwiseにそれぞれ対応してます。

比率で見ると  \frac{K \times K \times M \times F \times F + M \times N \times F \times F}{K \times K \times M \times N \times F \times F} = 1 / N + 1 / (K \times K) の計算量となります。一般的にK=3、N>=16のことが多いため、80%以上の計算量削減を見込めることがわかります。

精度と速度のトレードオフを調整するハイパパラメータ

精度と速度のトレードオフを調整するために、2つのハイパパラメータを導入してます。これによって様々なドメインやハードウェアへ適応することを狙ってます。

  • Width multiplier:  \alpha \in (0, 1 ]
    • チャネル数をそれぞれ  \alpha M, \alpha N に落とす => パラメータ数は概ね  \alpha^2 の削減
  • Resolution multiplier:  \rho \in (0, 1 ]
    • 特徴マップのサイズを  \rho F \times \rho F に落とす => パラメータ数は概ね  \rho^2 の削減

これら2つを組み込むと計算量は  K \times K \times \alpha M \times \rho (F \times F) + \alpha M \times \alpha N \times \rho (F \times F) となります。

アーキテクチャと評価

depthwise / pointwiseを組み込んだネットワークを構築・評価していきます。

画像 (224x224x3) から1000クラスに分類するImageNetのタスク用に、以下のネットワークを構築してます。

興味深いのは層ごとのパラメータ数の集計で、大した事なさそうな1x1畳み込み層が約3/4を占めており、支配的となっている点です。

通常の畳み込みと比較する (Table 4.) と、内積数で1/8、パラメータ数で1/7までそれぞれ削減しているが、Accuracyの低下は1.1%にとどまってます。これより効率的に計算量を削減できていることがわかります。

  •  \alpha = 0.75 のMobileNetと同程度のパラメータを持つ層数のMobileNetを比較 (Table 5.) すると、前者の方が高いAccuracyを達成しており、  alpha によって効率的にパラメータを減らすことが確認できます

他のモデルとの比較をしてますが、GoogleNetやVGG16などのより大きなモデルと同程度のAccuracyを実現してます (Table 8.)。 また、SqueezeNetやAlexNetなどの小さなCNNと比較 (Table 9.) しても、パラメータ数・Accuracyの両方において、MobileNetが優位であることがわかります。

  • 比較対象が提案された2017年当時でも少し古いモデルになっていますので、その点は注意です

まとめ

今回はMobileNetの提案論文について紹介しました。ビジネスへの適用を前提とすると、以下もMobileNetの重要な利点かなと思いました。

  • 畳み込み層の改善であるため、分類・物体検出・セグメンテーションなど様々なタスクに組み込むことができる
  • 構造が単純なため、どんなディープラーニングフレームワークでも簡単に実装できる

Python: unittest.mockでモックを作ってテストする

Python (3.3以降) でユニットテストのモックを楽に作れるunittest.mockが標準ライブラリとして提供されてます。今回はその紹介を行います。

外部モジュールに依存した実装をテストする難しさ

ユニットテストの実現において、DBやWeb APIなどのアプリケーション外のモジュールに依存している実装をどう扱うかは難しい問題です。 テストしやすいように (= 外部モジュールの影響範囲が小さくなるように) アプリケーションを適切に分割していくのも1つの手ですが、実務上、外部モジュールを含めたテストを完全には避けることは難しいです。

外部モジュールを含めたテストを行う場合、それらの振る舞いを擬態するモックが必要となります。 アプリケーションの機能とは直接関係のないテストのための実装なのですが、以下のように細かいケースに対応しないといけないので、なかなかの厄介者です。

  • 関数に適切な引数が渡されているかどうかチェックしたい
  • 返り値だけではなく、外部モジュールから例外がスローされた場合の分岐処理もテストしたい
  • 呼び出しの回数・順序に応じて振る舞いが変わるケースもテストしたい

unittest.mock

unittest.mockを使ったテストは3ステップで行います。

  • 外部モジュールを擬態するモックを作る => MagicMockで定義
  • モックをテスト対象に埋め込んでテスト対象モジュールを実行 => patchで注入
  • アサート (通常のunittestと同じ)

DBに依存したクラス (テストの対象)

DB (slite3) にアクセスするクラス・MemberModelを定義します。

sqlite3のmembersテーブルを更新するクラスで、create_membersupdate_emailの2つのメソッドを持ちます

import sqlite3


class MemberEntity(object):
    def __init__(self, id: int = None, name: str = None, email: str = None, nickname: str = None):
        self.id = id
        self.name = name
        self.email = email
        self.nickname = nickname


class MemberModel(object):
    def __init__(self, db):
        self.db = db

    def create_members(self, members: list) -> []:
        results = []

        conn = sqlite3.connect(self.db)

        for member in members:
            try:
                cur = conn.cursor()

                sql = "insert into members (name, email) values (?, ?)"
                # DBで採番される
                id = cur.execute(sql, (member.name, member.email,)).lastrowid

                # idを使って被らないニックネームを生成して更新
                default_nickname = "{}_{}".format(member.name, str(id))
                sql = "update members set nickname = ? where id = ?"
                cur.execute(sql, (default_nickname, id))

                conn.commit()

                results.append(MemberEntity(id, member.name, member.email, default_nickname))
            except:
                # 例外が発生したらロールバック
                conn.rollback()

        conn.close()

        return results

    def update_email(self, member: MemberEntity) -> int:
        try:
            with sqlite3.connect(self.db) as conn:
                cur = conn.cursor()

                sql = "update members set email = ? where id = ?"
                cur.execute(sql, (member.email, member.id))

                conn.commit()
        except sqlite3.IntegrityError:
            # emailが重複した場合は0を返す
            conn.rollback()
            return 0

        # 更新できた場合は1を返す
        return 1

membersテーブルは以下で定義してます。ポイントは3点です。

  • 自動採番されるidを主キーとする
  • emailはユニーク制約付き
  • nickname{name}_{id}で初期化する
    • 自動採番されるidを含むため、INSERT後にUPDATEする必要がある
create table members
(
    id       integer
        primary key,
    name     varchar(256) not null,
    email    varchar(256) not null
        unique,
    nickname varchar(256)
);

ユニットテストの実装

create_membersで1レコードをインサートしてMemberEntityオブジェクトを返す処理のユニットテストを以下で実装します。

import unittest
from unittest.mock import PropertyMock, MagicMock, patch
import sqlite3

from member import MemberEntity, MemberModel


class TestMemberModel(unittest.TestCase):
    def test_create_members_1(self):
        # Step 1. モックオブジェクトの作成

        result_mock = MagicMock()
        type(result_mock).lastrowid = PropertyMock(return_value=1)
        # プロパティは以下のようにdictで渡すこともできる
        # result_mock = MagicMock(**{"lastrowid": 1})

        cur_mock = MagicMock()
        cur_mock.execute.return_value = result_mock

        conn_mock = MagicMock()
        conn_mock.cursor.return_value = cur_mock

        # Step 2. モックの差し込みとテスト対象モジュールの実行

        with patch("sqlite3.connect", return_value=conn_mock):
            members = [MemberEntity(None, "Name1", "Email1", None)]

            model = MemberModel(db="dummy")

            new_members = model.create_members(members)

        # Step 3. アサート

        # 1度だけ呼び出されたことをチェック
        conn_mock.commit.assert_called_once_with()

        # 1度も呼び出されていないことをチェック
        conn_mock.rollback.assert_not_called()

        # 返り値のチェック
        assert len(new_members) == 1
        assert new_members[0].id == 1
        assert new_members[0].nickname == "Name1_1"

... (続く) ...

MagicMockの生成

Step 1.のポイントはMagicMockを使ったモックオブジェクトの生成です。

MagicMockは任意のクラスや関数としての振る舞いを定義できます。
conn_mockはcursorメソッド、cur_mockはexecuteメソッド、result_mockはlastrowidプロパティを持つことが表現されています。プロパティはPropertyMockを使って定義します。

  • MagicMockはMockクラスを継承しており、__str__などのいくつかのマジックメソッドが定義されたものです

MagicMockのメソッドの返り値は、return_valueによって任意の値やオブジェクトを設定できます。
MagicMockを入れ子にすることで、conn_mock.cursor()でcur_mock、cur_mock.execute()でresult_mockをそれぞれ返すようにしています。

        result_mock = MagicMock()
        type(result_mock).lastrowid = PropertyMock(return_value=1)
        # プロパティは以下のようにdictで渡すこともできる
        # result_mock = MagicMock(**{"lastrowid": 1})

        cur_mock = MagicMock()
        cur_mock.execute.return_value = result_mock

        conn_mock = MagicMock()
        conn_mock.cursor.return_value = cur_mock

patch

Step 2.では生成したモックオブジェクトを、テスト対象のモジュールに外 (テストコード) から差し込み、 モジュールを実行してます。

この差し込みにはpatchを使います。
1つ目の引数 (traget) にsqlite3.connectを指定し、2つ目の引数 (return_value) に上で作成したconn_mockを渡しています。with句の区間内では、sqlite3.connectが参照するオブジェクトはconn_mockとなります。

  • with句以外にも、デコレータとして使うこともでき、関数内・クラス内といった柔軟なスコープで差し込みができます

それ以外は通常のテスト同様、対象のモジュールを実行してます。

        with patch("sqlite3.connect", return_value=conn_mock):
            members = [MemberEntity(None, "Name1", "Email1", None)]

            model = MemberModel(db="dummy")

            new_members = model.create_members(members)

MagicMockのアサーション

Step 3.では返り値のチェック等を行っています。

MagicMock特有の機能として、コールされたかどうかをチェックするassert_called_once_withassert_not_calledなどがあります。

  • MagicMockはコールの履歴をmock_callsmethod_callsにそれぞれ持っており、それによってより詳細に回数や順序をチェックすることができます
        # 1度だけ呼び出されたことをチェック
        conn_mock.commit.assert_called_once_with()

        # 1度も呼び出されていないことをチェック
        conn_mock.rollback.assert_not_called()

        # 返り値のチェック
        assert len(new_members) == 1
        assert new_members[0].id == 1
        assert new_members[0].nickname == "Name1_1"

呼び出し履歴によってモックの返り値を変更する

members.nicknameは末尾に"_{id}"を付けることで、同じnameでも異なるデフォルト値になるようにしています。それを確認するユニットテストでは、同じexecuteの呼び出しでも異なるidを返すようにモックを実装する必要があります。

こういったケースではside_effectが役立ちます。
以下ではPropertyMock(side_effect=[1, 2])として返り値のリストを渡すことで、1回目の呼び出しでは1、2回目の呼び出しでは2が返されるようにしてます。

class TestMemberModel(unittest.TestCase):
    ... (続き) ...

    def test_create_members_2(self):
        result_mock = MagicMock()
        # 戻り値を1回目は1、2回目は2とする
        type(result_mock).lastrowid = PropertyMock(side_effect=[1, 2])

        cur_mock = MagicMock()
        cur_mock.execute.return_value = result_mock

        conn_mock = MagicMock()
        conn_mock.cursor.return_value = cur_mock

        with patch("sqlite3.connect", return_value=conn_mock):
            members = [
                MemberEntity(None, "Name", "Email1", None),
                MemberEntity(None, "Name", "Email2", None),
            ]

            model = MemberModel(db="dummy")

            new_members = model.create_members(members)

        assert len(new_members) == 2
        # 同じnameでもnicknameは異なること
        assert new_members[0].nickname != new_members[1].nickname

... (続く) ...

モックから例外をスローする

side_effectに例外オブジェクトを設定すると、呼び出しによって例外がスローされます。
members.emailのユニーク制約エラー (sqlite3.IntegrityError) をスローさせ、返り値が0になることをチェックしてます。

class TestMemberModel(unittest.TestCase):
    ... (続き) ...

    def test_update_email_unique_error(self):
        # executeでユニーク制約エラーを発生させる (戻り値0)
        cur_mock = MagicMock()
        cur_mock.execute = MagicMock(side_effect=sqlite3.IntegrityError())

        conn_enter_mock = MagicMock()
        conn_enter_mock.cursor.return_value = cur_mock

        # with sqlite.connect() as conn: の場合
        conn_mock = MagicMock()
        conn_mock.__enter__.return_value = conn_enter_mock

        with patch("sqlite3.connect", return_value=conn_mock):
            member = MemberEntity(1, "Name1", "duplicated_email", "Name1_1")

            model = MemberModel(db="dummy")

            ret = model.update_email(member)

        assert ret == 0

unittest.mockのつらいところ

ここまでunittest.mockでよく使う機能を中心に紹介しましたが、実際に使ってみるとつらいところというのもいくつかありました。

高機能なモジュールのモックの定義が冗長になりやすい

今回はsqlite3のconnectを起点にしたモックを作りましたが、ConnectionおよびCursorのそれぞれのメソッドとプロパティを置き換える必要があり、ちょっとめんどくさいです。
モックを使う場合でも、外部モジュールに依存する箇所が限定されるようにクラス設計する必要性は変わらないということですね。

モックの定義やアサートが、テスト対象モジュールの細かな実装に依存している

説明を省いたのですが、上の例ではcreate_membersとupdate_emailでコネクションの張り方が異なっています。前者はconn = sqlite3.connect(db)、後者はwith sqlite3.connect(db) as conn:で取得してます。
実装上は大きな違いでは無いのですが、モックの定義はそれぞれに合わせて変える必要があります。というのも、後者はConnection.__enter__()がオブジェクトを返すためです。

他にも、assert_called_once_withで引数のチェックもできるのですが、引数名の有無なども含めて厳密に一致する必要があります。

どんなアクセスでも受容するMagicMockの特性上このあたりは致し方ないのですが、書き方によってテストをpassしたりfailしたりするとちょっとストレスです。
コネクションの取得などは共通のモジュールにまとめる、細かい引数などのチェックは行わずにアウトプットでアサートできるようにクラス設計する、などの工夫が必要です。

まとめ

DBに依存したモデルクラスに対して、unittest.mockのモックを使ったユニットテストを実装し、使い方や欠点などをまとめました。