RustでSQSのメッセージを送受信する (Rusoto)
AWSのマネージドサービスのSDKライブラリの最右翼Rusotoの使い方を、SQSとのメッセージのやり取りを通して、ひと通り触れていきます。
Rusoto
RusotoはAWSマネージドサービスの非公式SDKです。AWSの各サービスのAPIを叩くラッパ (バインダ) となっており、crate.ioで提供されています。
Rusotoはマネージドサービス別のライブラリ (rusoto_sqs, rusoto_s3など) に加えて、リージョン情報などを持つrusoto_coreを依存関係に追加する必要があります。SQSを使う場合、Cargo.tomlはこうなります。
- 本投稿では現時点で最新のstableバージョンを使ってます
- 0.43.0からasync/awaitにも対応されそうということなので、主要な使い方が若干かわる可能性があります
[dependencies] rusoto_core = "0.42.0" rusoto_sqs = "0.42.0"
こちらの記事がRusotoの概要を押さえるためにはとても参考になります。
Rustから扱うAWS API | κeenのHappy Hacκing Blog
SQS
実行時には環境変数にAWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYをセットしておきます。
$ AWS_ACCESS_KEY_ID=XXXXX AWS_SECRET_ACCESS_KEY=YYYYY cargo run
Rusotoの基本的な使い方は3ステップです。
- サービスごとに定義されたクライアント (ここではSqsClient) を生成する
- オペレーションごとに定義されたリクエスト構造体 (ここではSendMessageRequest) を生成する
- クライアントにリクエストを渡してオペレーションを実行し、結果をResultで受け取る
メッセージの送信
まず1通ずつSQSに送信するコードは以下となります。SqsClientのsend_messageを実行します。
- リクエストはDefaultトレイトが実装されてます
- SendMessageRequestの場合は、必須なのはqueue_urlとmessage_bodyの2つのみで、残りはデフォルト値を設定してます
- クライアントの各メソッドはFutureトレイトを実装したRusotoFutureが返されます
- syncで完了を待ち、unwrapで結果 (SendMessageResult) を取り出します
use rusoto_sqs::{ SqsClient, Sqs, Message, ReceiveMessageRequest, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest, DeleteMessageRequest }; fn main() { let region = "us-west-2"; let queue_url = "https://sqs.us-west-2.amazonaws.com/0123456789/test-sqs"; let sqs_client = SqsClient::new(region.parse().unwrap()); let request = SendMessageRequest { queue_url: queue_url.to_string(), message_body: "body".to_string(), ..Default::default() }; let result = sqs_client.send_message(request).sync().unwrap(); println!("{:?}", result); // SendMessageResult { // md5_of_message_attributes: None, // md5_of_message_body: Some("841a2d689ad86bd1611447453c22c6fc"), // md5_of_message_system_attributes: None, // message_id: Some("a425d096-58d5-4d6d-888e-ec943ad067ce"), // sequence_number: None // } }
SQSは最大10通まで同時にバッチ送信できますが、Rusotoでもsend_message_batchでそれができます。
- SendMessageBatchRequestEntryのVecをSendMessageBatchRequestでラップします
- このようにリクエストごとに異なる構造体を使いますので、わかりやすい反面、use節が大きくなりやすいです
// ... 省略 ... fn main() { // ... 省略 ... let mut entries = vec![]; for i in 0..10 { entries.push(SendMessageBatchRequestEntry { id: i.to_string(), message_body: format!("body: {}", i), ..Default::default() }); } let request = SendMessageBatchRequest { queue_url: queue_url.to_string(), entries: entries, }; let result = sqs_client.send_message_batch(request).sync().unwrap(); }
メッセージの受信
メッセージの受信の場合も同様に、ReceiveMessageRequestを作り、SqsClientのreceive_messageメソッドを実行することでできます。結果はVec[Message]で返ってきます。
- max_number_of_messagesに2以上の値を設定すると複数通数が得られるようになります
// ... 省略 ... fn main() { // ... 省略 ... let request = ReceiveMessageRequest { queue_url: queue_url.to_string(), max_number_of_messages: Some(1), ..Default::default() }; let result = sqs_client.receive_message(request).sync().unwrap(); println!("{:?}", result); // ReceiveMessageResult { // messages: Some([Message { // attributes: None, body: Some("body"), // md5_of_body: Some("841a2d689ad86bd1611447453c22c6fc"), // md5_of_message_attributes: None, // message_attributes: None, // message_id: Some("cab2da51-5a9f-4c8e-8d55-5b13470ed10c"), // receipt_handle: Some("AQEBjJQBhpuy2/4gKsTQD6YVDz+uwL2Th+wnfw9qxgn9U8gsk5CHuGIB1l5p15KLuEGDXaLIb4wQmW/31rOZnXSYsNmfVcuUHG8Pa/GK4q8TQsEu2lBWW1MBPVv2Xn1pWeE4+XHwGrLnQTCWizgAnKZTHRz0b5nUz4pIIU5liGrQ+EAx0GFxiHXbVqQAck8UuXIO3s1o3st3oIyPT8nrmMp23uaWRN3MqvUNeFCi//LE9Vg5iF0If8whH8xDGdGU6CX7ZCSJPr96pzonJanA/82gfFz7G4mlMiKLv8JCo83BL3xVd0YnSnJVgMaOM6yeuOanx/8PxRD4QiOhBzU7WPJPQ3nO9eD12iwg/nN/tZA++UDbp4EwhbUjL6EMhHDk+WfdlL3AtQLfLDMsnT2C4ZytEA==") // }]) // } }
メッセージの削除
メッセージの削除も、Messageのreceipt_handleを指定したDeleteMessageRequestを生成して、delete_messageを実行すればOKです。
- delete_message_batchでまとめて削除する場合、バッチ送信同様、DeleteMessageBatchRequestEntryのVecをDeleteMessageBatchRequestでラップしてリクエストするとできます
// ... 省略 ... fn main() { // ... 省略 ... let request = DeleteMessageRequest { queue_url: queue_url.to_string(), receipt_handle: message.receipt_handle.unwrap().clone(), }; let result = sqs_client.delete_message(request).sync().unwrap(); println!("{:?}", result); // () }
まとめ
AWS SQSのメッセージを送受信を通して、Rusotoの使い方を見ていきました。