RustでSQSのメッセージを送受信する (Rusoto)

AWSのマネージドサービスのSDKライブラリの最右翼Rusotoの使い方を、SQSとのメッセージのやり取りを通して、ひと通り触れていきます。

Rusoto

RusotoはAWSマネージドサービスの非公式SDKです。AWSの各サービスのAPIを叩くラッパ (バインダ) となっており、crate.ioで提供されています。

github.com

Rusotoはマネージドサービス別のライブラリ (rusoto_sqs, rusoto_s3など) に加えて、リージョン情報などを持つrusoto_coreを依存関係に追加する必要があります。SQSを使う場合、Cargo.tomlはこうなります。

  • 本投稿では現時点で最新のstableバージョンを使ってます
    • 0.43.0からasync/awaitにも対応されそうということなので、主要な使い方が若干かわる可能性があります
[dependencies]
rusoto_core = "0.42.0"
rusoto_sqs = "0.42.0"

こちらの記事がRusotoの概要を押さえるためにはとても参考になります。

Rustから扱うAWS API | κeenのHappy Hacκing Blog

SQS

実行時には環境変数にAWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYをセットしておきます。

$ AWS_ACCESS_KEY_ID=XXXXX AWS_SECRET_ACCESS_KEY=YYYYY cargo run

Rusotoの基本的な使い方は3ステップです。

  1. サービスごとに定義されたクライアント (ここではSqsClient) を生成する
  2. オペレーションごとに定義されたリクエスト構造体 (ここではSendMessageRequest) を生成する
  3. クライアントにリクエストを渡してオペレーションを実行し、結果をResultで受け取る

メッセージの送信

まず1通ずつSQSに送信するコードは以下となります。SqsClientのsend_messageを実行します。

  • リクエストはDefaultトレイトが実装されてます
    • SendMessageRequestの場合は、必須なのはqueue_urlとmessage_bodyの2つのみで、残りはデフォルト値を設定してます
  • クライアントの各メソッドはFutureトレイトを実装したRusotoFutureが返されます
    • syncで完了を待ち、unwrapで結果 (SendMessageResult) を取り出します
use rusoto_sqs::{
    SqsClient, Sqs, Message, ReceiveMessageRequest,
    SendMessageBatchRequest, SendMessageBatchRequestEntry,
    SendMessageRequest, DeleteMessageRequest
};

fn main() {
    let region = "us-west-2";
    let queue_url = "https://sqs.us-west-2.amazonaws.com/0123456789/test-sqs";

    let sqs_client = SqsClient::new(region.parse().unwrap());

    let request = SendMessageRequest {
        queue_url: queue_url.to_string(),
        message_body: "body".to_string(),
        ..Default::default()
    };

    let result = sqs_client.send_message(request).sync().unwrap();
    println!("{:?}", result);
    // SendMessageResult {
    //    md5_of_message_attributes: None, 
    //    md5_of_message_body: Some("841a2d689ad86bd1611447453c22c6fc"), 
    //    md5_of_message_system_attributes: None, 
    //    message_id: Some("a425d096-58d5-4d6d-888e-ec943ad067ce"), 
    //    sequence_number: None
    // }
}

SQSは最大10通まで同時にバッチ送信できますが、Rusotoでもsend_message_batchでそれができます。

  • SendMessageBatchRequestEntryのVecをSendMessageBatchRequestでラップします
    • このようにリクエストごとに異なる構造体を使いますので、わかりやすい反面、use節が大きくなりやすいです
// ... 省略 ...

fn main() {
    // ... 省略 ...

    let mut entries = vec![];
    for i in 0..10 {
        entries.push(SendMessageBatchRequestEntry {
            id: i.to_string(),
            message_body: format!("body: {}", i),
            ..Default::default()
        });
    }

    let request = SendMessageBatchRequest {
        queue_url: queue_url.to_string(),
        entries: entries,
    };

    let result = sqs_client.send_message_batch(request).sync().unwrap();
}

メッセージの受信

メッセージの受信の場合も同様に、ReceiveMessageRequestを作り、SqsClientのreceive_messageメソッドを実行することでできます。結果はVec[Message]で返ってきます。

  • max_number_of_messagesに2以上の値を設定すると複数通数が得られるようになります
// ... 省略 ...

fn main() {
    // ... 省略 ...

    let request = ReceiveMessageRequest {
        queue_url: queue_url.to_string(),
        max_number_of_messages: Some(1),
        ..Default::default()
    };

    let result = sqs_client.receive_message(request).sync().unwrap();
    println!("{:?}", result);
    // ReceiveMessageResult {
    //     messages: Some([Message {
    //         attributes: None, body: Some("body"), 
    //         md5_of_body: Some("841a2d689ad86bd1611447453c22c6fc"), 
    //         md5_of_message_attributes: None,
    //         message_attributes: None, 
    //         message_id: Some("cab2da51-5a9f-4c8e-8d55-5b13470ed10c"), 
    //         receipt_handle: Some("AQEBjJQBhpuy2/4gKsTQD6YVDz+uwL2Th+wnfw9qxgn9U8gsk5CHuGIB1l5p15KLuEGDXaLIb4wQmW/31rOZnXSYsNmfVcuUHG8Pa/GK4q8TQsEu2lBWW1MBPVv2Xn1pWeE4+XHwGrLnQTCWizgAnKZTHRz0b5nUz4pIIU5liGrQ+EAx0GFxiHXbVqQAck8UuXIO3s1o3st3oIyPT8nrmMp23uaWRN3MqvUNeFCi//LE9Vg5iF0If8whH8xDGdGU6CX7ZCSJPr96pzonJanA/82gfFz7G4mlMiKLv8JCo83BL3xVd0YnSnJVgMaOM6yeuOanx/8PxRD4QiOhBzU7WPJPQ3nO9eD12iwg/nN/tZA++UDbp4EwhbUjL6EMhHDk+WfdlL3AtQLfLDMsnT2C4ZytEA==") 
    //     }]) 
    // }
}

メッセージの削除

メッセージの削除も、Messageのreceipt_handleを指定したDeleteMessageRequestを生成して、delete_messageを実行すればOKです。

  • delete_message_batchでまとめて削除する場合、バッチ送信同様、DeleteMessageBatchRequestEntryのVecをDeleteMessageBatchRequestでラップしてリクエストするとできます
// ... 省略 ...

fn main() {
    // ... 省略 ...

    let request = DeleteMessageRequest {
        queue_url: queue_url.to_string(),
        receipt_handle: message.receipt_handle.unwrap().clone(),
    };

    let result = sqs_client.delete_message(request).sync().unwrap();
    println!("{:?}", result);  
    // ()
}

まとめ

AWS SQSのメッセージを送受信を通して、Rusotoの使い方を見ていきました。