SQSを永遠にポーリングするPythonパッケージ sqs-polling を作りました

タイトルの通りで、SQSを永遠にポーリングするPythonパッケージを作ってPyPiに公開しました。

pypi.org

GitHubはこちら。

github.com

使い方

pip install sqs-pollingでインストール。あとは以下のように記述すると、延々とSQSからメッセージを取り出し、コールバックを実行してくれます。最小コードでSQSのデーモンを実現できることが最大の売りです。

from sqs_polling import sqs_polling

def your_callback(message, greeting):
    print(greeting + message["name"])
    return True

your_queue_url="https://sqs.your-region.amazonaws.com/XXXXXXXXXXXX/your-sqs",
sqs_polling(queue_url=your_queue_url, callback=your_callback, callback_args={"greeting": "Hello, "})

その他のお便利ポイントとしては3点あります。

  • SQSメッセージの本文 $.Messages.Body を勝手に取り出してコールバックの引数にしてくれる (オプションでメッセージ全体も取り出し可能)
  • コールバックの返り値をTrueとするだけでSQSからメッセージを削除してくれます (逆にFalseとするとメッセージが残せますので再処理なども自由にコントロールできます)
  • マルチスレッド・マルチプロセスにも対応

モチベーション

AWSにおいて、コンポネント間の通信を疎結合にし、かつ、再処理が容易なアーキテクチャを手軽に実現するために、SQSを採用しているシステムが多いかと思います。

SQSを受信する側は、SQSからメッセージを取り出し -> 何らかの加工や更新を行い -> 正常に終わったらメッセージを削除する を繰り返すデーモン的な処理を実装することになります。
これをwhile True: + time.sleep(1) で実装するというのがあるあるパターンなのですが、ちょっと渋いコードになりやすいです。

また、素のboto3を使って実装すると、定型の処理が多く、ちょっとめんどくさいです。
本文を取り出すためにメッセージのJSONを深くまでパースしたり、正しく処理されたメッセージをSQSから削除するためにハンドラを指定してdeleteするなど、SQS流のお作法を押さえておく必要があります。

デーモン処理とSQS特有のお作法を抽象化・隠蔽するための、薄いフレームワークとしてsqs-pollingを作りました。SQSをサクッと使ってみたい、というときに検討してみてください。