メッセージサービスベースのトランザクションメッセージを実装する方法
状況
ローカル操作は、メッセージ送信トランザクションと一貫している必要があります。つまり、メッセージが正常に送信された場合、ローカル操作は成功します。メッセージの送信に失敗した場合、ローカル操作は失敗します(メッセージは正常に送信されたとしてもロールバックする必要があります)。これにより、操作が成功するが、メッセージの送信に失敗したり、操作が失敗してもメッセージは正常に送信されたりすることはありません。
さらに、コンシューマプログラムのクラッシュに起因するメッセージ処理が失敗した結果、コンシューマの進行状況を手動でリセットしないように、コンシューマエンドでメッセージを1回正常に処理する必要があります。
解決策
メッセージサービスのメッセージ遅延機能を使用して、トランザクションメッセージを実装することができます。
準備
2つのキューを作成します。
トランザクションメッセージキュー
メッセージの有効期間は、メッセージの遅延時間よりも短くなっています。つまり、プロデューサがメッセージの可視時間を積極的に変更(送信)しない場合、メッセージはコンシューマには見えません。
操作ログキュー
操作ログ・キューには、トランザクション・メッセージの操作が記録されます。メッセージ遅延時間は、トランザクション操作のタイムアウト時間です。ログキュー内のメッセージが確認(削除)された後、そのメッセージはコンシューマには見えません。
手順
プロデューサは、トランザクション準備メッセージをトランザクションメッセージキューに送信します。
プロデューサは、操作ログ・メッセージを操作ログ・キューに書き込みます。ログには、ステップ1のメッセージのハンドルが含まれています。
プロデューサはローカルトランザクション操作を実行します。
ステップ3の操作が成功すると、プロデューサはメッセージを送信します(これはコンシューマに表示されます)。それ以外の場合、プロデューサはメッセージをロールバックします。
プロデューサは、ステップ2で操作ログを確認します(およびログメッセージを削除します)。
ステップ4が完了すると、コンシューマはトランザクションメッセージを受信できます。
コンシューマはメッセージを処理する。
コンシューマは、メッセージを確認して削除する。
次の図を参照してください。
例外解析:
プロデューサの例外(例:プロセスの再起動)
A. 操作ログキューのタイムアウトにより確認されなかったログを読み取ります。
B. 取引結果を確認する。
C. チェックの結果、トランザクションが成功したことが示された場合は、メッセージを送信します。(繰り返し送信されたメッセージはシステムには影響しません。同じハンドルを持つメッセージは、1度だけ正常に送信できます)。
D. 操作ログを確認します。
コンシューマの例外(例:プロセスの再起動)
メッセージは少なくとも1回コンシューマで処理されなければなりません。ステップ8が失敗した場合、メッセージは一定期間後にコンシューマに表示され、現在または別のコンシューマによって処理されます。
到達不能メッセージサービス(例:ネットワーク切断)
メッセージの送受信状態と操作ログはMessage Serviceに保存され、高い信頼性と高い可用性を備えています。ネットワーク接続が復旧した後、トランザクションを継続的に実装して、操作とメッセージ送信トランザクションとの一貫性を確保することができます。
コードの実装:
最新のMessage Service Java SDK(1.1.5)のTransactionQueueは、前述のトランザクションメッセージソリューションをサポートしています。TransactionOperationsとTransactionCheckerにサービス操作とチェックロジックを追加することで、簡単にトランザクションメッセージを実装できます。
デモコード
public class TransactionMessageDemo{
public class MyTransactionChecker implements TransactionChecker
{
public boolean checkTransactionStatus(Message message)
{
boolean checkResult = false;
String messageHandler = message.getReceiptHandle();
try{
//TODO: check if the messageHandler related transaction is success.
checkResult = true;
}catch(Exception e)
{
checkResult = false;
}
return checkResult;
}
}
public class MyTransactionOperations implements TransactionOperations
{
public boolean doTransaction(Message message)
{
boolean transactionResult = false;
String messageHandler = message.getReceiptHandle();
String messageBody = message.getMessageBody();
try{
//TODO: do your local transaction according to the messageHandler and messageBody here.
transactionResult = true;
}catch(Exception e)
{
transactionResult = false;
}
return transactionResult;
}
}
public static void main(String[] args) {
System.out.println("Start TransactionMessageDemo");
String transQueueName = "transQueueName";
String accessKeyId = ServiceSettings.getMNSAccessKeyId();
String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
String endpoint = ServiceSettings.getMNSAccountEndpoint();
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
MNSClient client = account.getMNSClient(); //this client need only initialize once
// create queue for transaction queue.
QueueMeta queueMeta = new QueueMeta();
queueMeta.setQueueName(transQueueName);
queueMeta.setPollingWaitSeconds(15);
TransactionMessageDemo demo = new TransactionMessageDemo();
TransactionChecker transChecker = demo.new MyTransactionChecker();
TransactionOperations transOperations = demo.new MyTransactionOperations();
TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);
// do transaction.
Message msg = new Message();
String messageBody = "TransactionMessageDemo";
msg.setMessageBody(messageBody);
transQueue.sendTransMessage(msg, transOperations);
// delete queue and close client if we won't use them.
transQueue.delete();
// close the client at the end.
client.close();
System.out.println("End TransactionMessageDemo");
}
}