け日記

最近はPythonでいろいろやってます

AWS SQSのメッセージをPythonで送受信する

AWSのキューメッセージサービスSQSのメッセージを、Boto3を使ってPythonで送受信する方法の備忘録です。

Boto3のインストール、AWSの設定、および、SQSの作成は完了している前提です。

$ pip install boto3

$ aws configure
AWS Access Key ID [****************XXXX]: 
AWS Secret Access Key [****************XXXX]: 
Default region name [ap-northeast-1]: 
Default output format [json]: 

詳細なBoto3のインタフェースはこちらです。 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html

送信

メッセージの送信には send_message または send_message_batch メソッドを使います。

import boto3

sqs_client = boto3.client("sqs")

queue_url = "https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXXXX/my-sqs"

response = sqs_client.send_message(QueueUrl=queue_url, MessageBody="message 1")
print(response)
# {
#   'MD5OfMessageBody': '1db65a6a0a818fd39655b95e33ada11d', 
#   'MessageId': '58da63b0-426b-4fda-9b47-64017fd449dd', 
#   'ResponseMetadata': {
#     'RetryAttempts': 0, 
#     'HTTPHeaders': {
#       'content-type': 'text/xml', 
#       'content-length': '378', 
#       'x-amzn-requestid': '9219238c-8640-5cb4-826f-754810eddbac', 
#       'date': 'Sat, 19 Oct 2019 04:43:25 GMT'
#     }, 
#     'RequestId': '9219236c-8640-5cb4-826f-754810edd8ac', 
#     'HTTPStatusCode': 200
#   }
# }

実用上、複数の値を受け渡ししやすいように、JSON文字列にシリアライズして送ることが多いかなと思います。

import json

sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps({"key1": 123, "key2": "hoge"}))

send_message_batchは以下のように使います。 - Entries引数に辞書のリストを渡すのですが、$.Id$.MessageBodyのキーが必須となります - $.Idはリクエスト内でユニークな値である必要があり、かつ、80字以内のため、連番やuuid.uuid4().__str__()で生成すると良いかと思います - SQSには1回あたりのバッチ送信可能なサイズに制限があり、256KBです

sqs_client.send_message_batch(
    QueueUrl=queue_url, 
    Entries=[{"Id": "1", "MessageBody": "message 1"}, {"Id": "2", "MessageBody": "message 2"}]
)

受信

SQSのメッセージを受信するには receive_message メソッドを使います。

レスポンスはdictで得られ、$.Messages.Bodyに送信されたメッセージが入ってます。 - Messageは配列で、引数MaxNumberOfMessages (デフォルト1) に2以上の値を設定することで、複数のメッセージを同時に受け取れます (最大10) - 引数VisibilityTimeoutは、受信されたメッセージが再度受信可能となるまでの時間 (秒) で、この間は他のリクエストによって受信できなくなります - 詳しくはSQSのドキュメント https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html を参照ください - MessageIdが送信時の値と一致していることも確認してください

import boto3

queue_url = "https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXXXX/my-sqs"

response = sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1, VisibilityTimeout=60)
print(response)
# {
#   'Messages': [{
#     'ReceiptHandle': 'AQEBBx/pk6wa...', 
#     'MD5OfBody': '1db65a6a0a818fd39655b95e33ada11d', 
#     'MessageId': '58da63b0-426b-4fda-9b47-64017fd449dd', 
#     'Body': 'message 1'
#   }],
#   'ResponseMetadata': {
#     'RetryAttempts': 0, 
#     'HTTPHeaders': {
#       'content-type': 'text/xml', 
#       'content-length': '860', 
#       'x-amzn-requestid': '66222d85-7321-79a2-b21a-7ba1eb393a4a', 
#       'date': 'Sat, 19 Oct 2019 04:49:28 GMT'
#     }, 
#     'RequestId': '66222d85-7321-59a2-b21a-7ba1eb393a4a', 
#     'HTTPStatusCode': 200
#   }
# }

メッセージが空 (SQSに処理可能なメッセージが無い) 場合、$.Messagesが無いdictが得られます。このため、メッセージを取り出す前に "Messages" in response などでチェックを行う必要があります。

{
  'ResponseMetadata': {
    'RetryAttempts': 0, 
    'HTTPHeaders': {...}, 
    'RequestId': 'd7aa8408-0fa8-5a29-8538-d22c980749c1', 
    'HTTPStatusCode': 200
  }
}

メッセージの削除

正常に受信が行われた場合でも、そのままではSQSにメッセージが残り続けます。そのため、受信側で明示的に削除する必要があります。

メッセージの削除には delete_message または delete_message_batch メソッドへ、受信時に得られた$.Messages.ReceiptHandleを引数に渡すことで削除します。

for message in response["Messages"]:
    sqs_client.delete_message(QueueUrl=queue_url, ReceiptHangle=message["ReceiptHandle"])

# または
sqs_client.delete_message_batch(
    QueueUrl=queue_url,
    Entries=[{"Id": "1", "ReceiptHandle": "..."}, {"Id": "2", "ReceiptHandle": "..."}, ...]
)