すべてのプロダクト
Search
ドキュメントセンター

:トランザクションメッセージ

最終更新日:Mar 22, 2020

メッセージサービスベースのトランザクションメッセージを実装する方法

状況

ローカル操作は、メッセージ送信トランザクションと一貫している必要があります。つまり、メッセージが正常に送信された場合、ローカル操作は成功します。メッセージの送信に失敗した場合、ローカル操作は失敗します(メッセージは正常に送信されたとしてもロールバックする必要があります)。これにより、操作が成功するが、メッセージの送信に失敗したり、操作が失敗してもメッセージは正常に送信されたりすることはありません。

さらに、コンシューマプログラムのクラッシュに起因するメッセージ処理が失敗した結果、コンシューマの進行状況を手動でリセットしないように、コンシューマエンドでメッセージを1回正常に処理する必要があります。

解決策

メッセージサービスのメッセージ遅延機能を使用して、トランザクションメッセージを実装することができます。

準備

2つのキューを作成します。

  1. トランザクションメッセージキュー

    メッセージの有効期間は、メッセージの遅延時間よりも短くなっています。つまり、プロデューサがメッセージの可視時間を積極的に変更(送信)しない場合、メッセージはコンシューマには見えません。

  2. 操作ログキュー

    操作ログ・キューには、トランザクション・メッセージの操作が記録されます。メッセージ遅延時間は、トランザクション操作のタイムアウト時間です。ログキュー内のメッセージが確認(削除)された後、そのメッセージはコンシューマには見えません。

手順

  1. プロデューサは、トランザクション準備メッセージをトランザクションメッセージキューに送信します。

  2. プロデューサは、操作ログ・メッセージを操作ログ・キューに書き込みます。ログには、ステップ1のメッセージのハンドルが含まれています。

  3. プロデューサはローカルトランザクション操作を実行します。

  4. ステップ3の操作が成功すると、プロデューサはメッセージを送信します(これはコンシューマに表示されます)。それ以外の場合、プロデューサはメッセージをロールバックします。

  5. プロデューサは、ステップ2で操作ログを確認します(およびログメッセージを削除します)。

  6. ステップ4が完了すると、コンシューマはトランザクションメッセージを受信できます。

  7. コンシューマはメッセージを処理する。

  8. コンシューマは、メッセージを確認して削除する。

次の図を参照してください。

例外解析:

プロデューサの例外(例:プロセスの再起動)

A. 操作ログキューのタイムアウトにより確認されなかったログを読み取ります。

B. 取引結果を確認する。

C. チェックの結果、トランザクションが成功したことが示された場合は、メッセージを送信します。(繰り返し送信されたメッセージはシステムには影響しません。同じハンドルを持つメッセージは、1度だけ正常に送信できます)。

D. 操作ログを確認します。

コンシューマの例外(例:プロセスの再起動)

メッセージは少なくとも1回コンシューマで処理されなければなりません。ステップ8が失敗した場合、メッセージは一定期間後にコンシューマに表示され、現在または別のコンシューマによって処理されます。

到達不能メッセージサービス(例:ネットワーク切断)

メッセージの送受信状態と操作ログはMessage Serviceに保存され、高い信頼性と高い可用性を備えています。ネットワーク接続が復旧した後、トランザクションを継続的に実装して、操作とメッセージ送信トランザクションとの一貫性を確保することができます。

Transaction Message

コードの実装:

最新のMessage Service Java SDK(1.1.5)のTransactionQueueは、前述のトランザクションメッセージソリューションをサポートしています。TransactionOperationsとTransactionCheckerにサービス操作とチェックロジックを追加することで、簡単にトランザクションメッセージを実装できます。

デモコード

  1. public class TransactionMessageDemo{
  2. public class MyTransactionChecker implements TransactionChecker
  3. {
  4. public boolean checkTransactionStatus(Message message)
  5. {
  6. boolean checkResult = false;
  7. String messageHandler = message.getReceiptHandle();
  8. try{
  9. //TODO: check if the messageHandler related transaction is success.
  10. checkResult = true;
  11. }catch(Exception e)
  12. {
  13. checkResult = false;
  14. }
  15. return checkResult;
  16. }
  17. }
  18. public class MyTransactionOperations implements TransactionOperations
  19. {
  20. public boolean doTransaction(Message message)
  21. {
  22. boolean transactionResult = false;
  23. String messageHandler = message.getReceiptHandle();
  24. String messageBody = message.getMessageBody();
  25. try{
  26. //TODO: do your local transaction according to the messageHandler and messageBody here.
  27. transactionResult = true;
  28. }catch(Exception e)
  29. {
  30. transactionResult = false;
  31. }
  32. return transactionResult;
  33. }
  34. }
  35. public static void main(String[] args) {
  36. System.out.println("Start TransactionMessageDemo");
  37. String transQueueName = "transQueueName";
  38. String accessKeyId = ServiceSettings.getMNSAccessKeyId();
  39. String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
  40. String endpoint = ServiceSettings.getMNSAccountEndpoint();
  41. CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
  42. MNSClient client = account.getMNSClient(); //this client need only initialize once
  43. // create queue for transaction queue.
  44. QueueMeta queueMeta = new QueueMeta();
  45. queueMeta.setQueueName(transQueueName);
  46. queueMeta.setPollingWaitSeconds(15);
  47. TransactionMessageDemo demo = new TransactionMessageDemo();
  48. TransactionChecker transChecker = demo.new MyTransactionChecker();
  49. TransactionOperations transOperations = demo.new MyTransactionOperations();
  50. TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);
  51. // do transaction.
  52. Message msg = new Message();
  53. String messageBody = "TransactionMessageDemo";
  54. msg.setMessageBody(messageBody);
  55. transQueue.sendTransMessage(msg, transOperations);
  56. // delete queue and close client if we won't use them.
  57. transQueue.delete();
  58. // close the client at the end.
  59. client.close();
  60. System.out.println("End TransactionMessageDemo");
  61. }
  62. }