AWSのキューメッセージサービスSQSのメッセージを、Boto3を使ってPythonで送受信する方法の備忘録です。
Boto3のインストール、AWSの設定、および、SQSの作成は完了している前提です。
$ pip install boto3 $ aws configure AWS Access Key ID [****************XXXX]: AWS Secret Access Key [****************XXXX]: Default region name [ap-northeast-1]: Default output format [json]:
詳細なBoto3のインタフェースはこちらです。 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html
送信
メッセージの送信には send_message または send_message_batch メソッドを使います。
import boto3 sqs_client = boto3.client("sqs") queue_url = "https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXXXX/my-sqs" response = sqs_client.send_message(QueueUrl=queue_url, MessageBody="message 1") print(response) # { # 'MD5OfMessageBody': '1db65a6a0a818fd39655b95e33ada11d', # 'MessageId': '58da63b0-426b-4fda-9b47-64017fd449dd', # 'ResponseMetadata': { # 'RetryAttempts': 0, # 'HTTPHeaders': { # 'content-type': 'text/xml', # 'content-length': '378', # 'x-amzn-requestid': '9219238c-8640-5cb4-826f-754810eddbac', # 'date': 'Sat, 19 Oct 2019 04:43:25 GMT' # }, # 'RequestId': '9219236c-8640-5cb4-826f-754810edd8ac', # 'HTTPStatusCode': 200 # } # }
実用上、複数の値を受け渡ししやすいように、JSON文字列にシリアライズして送ることが多いかなと思います。
import json sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps({"key1": 123, "key2": "hoge"}))
send_message_batchは以下のように使います。
- Entries引数に辞書のリストを渡すのですが、$.Id
と$.MessageBody
のキーが必須となります
- $.Id
はリクエスト内でユニークな値である必要があり、かつ、80字以内のため、連番やuuid.uuid4().__str__()
で生成すると良いかと思います
- SQSには1回あたりのバッチ送信可能なサイズに制限があり、256KBです
sqs_client.send_message_batch( QueueUrl=queue_url, Entries=[{"Id": "1", "MessageBody": "message 1"}, {"Id": "2", "MessageBody": "message 2"}] )
受信
SQSのメッセージを受信するには receive_message
メソッドを使います。
レスポンスはdictで得られ、$.Messages.Body
に送信されたメッセージが入ってます。
- Message
は配列で、引数MaxNumberOfMessages
(デフォルト1) に2以上の値を設定することで、複数のメッセージを同時に受け取れます (最大10)
- 引数VisibilityTimeout
は、受信されたメッセージが再度受信可能となるまでの時間 (秒) で、この間は他のリクエストによって受信できなくなります
- 詳しくはSQSのドキュメント https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html を参照ください
- MessageId
が送信時の値と一致していることも確認してください
import boto3 queue_url = "https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXXXX/my-sqs" response = sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1, VisibilityTimeout=60) print(response) # { # 'Messages': [{ # 'ReceiptHandle': 'AQEBBx/pk6wa...', # 'MD5OfBody': '1db65a6a0a818fd39655b95e33ada11d', # 'MessageId': '58da63b0-426b-4fda-9b47-64017fd449dd', # 'Body': 'message 1' # }], # 'ResponseMetadata': { # 'RetryAttempts': 0, # 'HTTPHeaders': { # 'content-type': 'text/xml', # 'content-length': '860', # 'x-amzn-requestid': '66222d85-7321-79a2-b21a-7ba1eb393a4a', # 'date': 'Sat, 19 Oct 2019 04:49:28 GMT' # }, # 'RequestId': '66222d85-7321-59a2-b21a-7ba1eb393a4a', # 'HTTPStatusCode': 200 # } # }
メッセージが空 (SQSに処理可能なメッセージが無い) 場合、$.Messages
が無いdictが得られます。このため、メッセージを取り出す前に "Messages" in response
などでチェックを行う必要があります。
{ 'ResponseMetadata': { 'RetryAttempts': 0, 'HTTPHeaders': {...}, 'RequestId': 'd7aa8408-0fa8-5a29-8538-d22c980749c1', 'HTTPStatusCode': 200 } }
メッセージの削除
正常に受信が行われた場合でも、そのままではSQSにメッセージが残り続けます。そのため、受信側で明示的に削除する必要があります。
メッセージの削除には delete_message
または delete_message_batch
メソッドへ、受信時に得られた$.Messages.ReceiptHandle
を引数に渡すことで削除します。
for message in response["Messages"]: sqs_client.delete_message(QueueUrl=queue_url, ReceiptHangle=message["ReceiptHandle"]) # または sqs_client.delete_message_batch( QueueUrl=queue_url, Entries=[{"Id": "1", "ReceiptHandle": "..."}, {"Id": "2", "ReceiptHandle": "..."}, ...] )