状況
Alibaba Cloud Message Service で提供されるキューは、高信頼性、高可用性、高並行性を備えています。Alibaba Cloud Apsara 分散プラットフォームでは、各キューのデータは 3 つのコピーで永続的に書き込まれます。各キューには、サービスを提供する 2 つ以上のサーバーが含まれ、各サーバーは高並行性アクセスをサポートします。このような分散特性により、Message Service キューは、従来の単一ホストキューのようにメッセージの厳密な first-in-first-out (FIFO) 順を保証できず、ある程度の順序付けのみ可能です。
キューに複数のメッセージ送信者が存在する場合、厳密なメッセージ順序は意味がありません。これは、並行性やネットワーク遅延などが原因で、複数の送信者が実際にメッセージを送信した順序と、メッセージが実際にサーバーに到達する順序は不明だからです。同様に、複数の受信者が同時にメッセージを受信した場合、実際にメッセージが処理される順序は不明です。
したがって、メッセージ順序は、送信者 1 人 (単一スレッドまたは複数スレッド) と受信者 1 人のみが存在する場合に意味があり、メッセージの実際の送信順序と受信順序を把握し、記録することができます。
解決策
上記の仮定に基づき、次の解決策では、メッセージ順序に関するユーザーの要件を満たし、メッセージが送信された順序で受信、消費されるように設計されています。
手順:
送信者はメッセージを色付けし、メッセージに SeqID (#num# など) を追加します。
受信者はメッセージを復元し、SeqID 順にソートし、上位層に返します。メッセージが重複して消費されないように、受信したメッセージに対してバックエンドスレッドが提供されます。
送信者または受信者のエラーによる SeqID の損失を防ぐため、SeqID はローカルディスクファイルに永続的に保存されます(SeqID は、OSS、Table Store、ApsaraDB for RDS など、他のストレージデバイスやデータベースに保存可能です)。
プログラムの説明
添付は、Python バージョンの解決策です (Message Service Python SDK を使用)。この中で、OrderedQueueWrapper タイプの oredered_queue.py ファイルは、一般的な Message Service キューを、順序付けされたキューにラップするために提供されます。
OrderedQueueWrapper には、SendMessageInOrder() メソッドと ReceiveMessageInOrder() メソッドがあります。ReceiveMessageInOrder() はメッセージを色付けするために使用され、ReceiveMessageInOrder() はメッセージを復元し、順番に受信者に返すために使用されます。
また、send_message_in_order.py と receive_message_in_order.py は、送信者と受信者が OrderedQueueWrapper を使用するためのサンプルプログラムです。
send_message_in_order.py:
#init orderedQueue
seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"} # Specifies the disk file to persistently send the SeqID.
seqIdPS = LocalDiskStorage(seqIdConfig)
orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
orderedQueue.SendMessageInOrder(message)
receive_message_in_order.py:
#init orderedQueue
seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} #Specifies the disk file to persistently receive the SeqID.
seqIdPS = LocalDiskStorage(seqIdConfig)
orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)
実行方法:
send_message_in_order.py と receive_message_in_order.py で、g_endpoint、g_accessKeyId、g_accessKeySecret、g_testQueueName を設定します。
send_message_in_order.py を実行します。
receive_message_in_order.py を実行します (このステップは、ステップ 2 を完了しなくても実行可能です)。送信者は 20 件のメッセージを送信し、受信者は 20 件のメッセージを順番に受信します。
oredered_queue.py のテストケースを実行して、順序付けされたキューと、一般的な Message Service キューを比較することもできます (エンドポイントと AK を設定する必要があります)。
コマンド:$python oredered_queue.py
厳密に順序付けされていない:(メッセージ全体が順序付けされますが、一部のメッセージは順序が乱れています。これは、単一の Message Service キュー内に複数のサーバーが同時にサービスを提供することを示します。)
厳密に順序付けされている:
注意:
このトピックの主な目的は、メッセージの順序付けの解決策を提供することです。このトピックのコードは、厳密なテストを実施していません。このコードをテストせず、本番環境でそのまま使用することは推奨しません。また、このプログラムにはエラーが含まれている可能性があります。訂正がある場合は、ご連絡ください。
通常の状態では、送信者と受信者の SeqID は、キュー内の (色付き) メッセージの SeqID と一致します。再作成のためにキューを削除する場合、ディスクファイルの SeqID がキュー内の SeqID と一致していることを確認してください。色付きのメッセージを含むキューに色なしのメッセージを送信することは推奨しません。
キュー内のショートメッセージ検証期間と各メッセージの実際の処理結果は、メッセージの順序に影響する場合があります。不規則な順番のメッセージは、プログラムで処理する必要があります。
サンプルプログラムのダウンロード: