すべてのプロダクト
Search
ドキュメントセンター

:厳密な FIFO キュー

最終更新日:Mar 22, 2020

状況

Alibaba Cloud Message Service で提供されるキューは、高信頼性、高可用性、高並行性を備えています。Alibaba Cloud Apsara 分散プラットフォームでは、各キューのデータは 3 つのコピーで永続的に書き込まれます。各キューには、サービスを提供する 2 つ以上のサーバーが含まれ、各サーバーは高並行性アクセスをサポートします。このような分散特性により、Message Service キューは、従来の単一ホストキューのようにメッセージの厳密な first-in-first-out (FIFO) 順を保証できず、ある程度の順序付けのみ可能です。

キューに複数のメッセージ送信者が存在する場合、厳密なメッセージ順序は意味がありません。これは、並行性やネットワーク遅延などが原因で、複数の送信者が実際にメッセージを送信した順序と、メッセージが実際にサーバーに到達する順序は不明だからです。同様に、複数の受信者が同時にメッセージを受信した場合、実際にメッセージが処理される順序は不明です。

したがって、メッセージ順序は、送信者 1 人 (単一スレッドまたは複数スレッド) と受信者 1 人のみが存在する場合に意味があり、メッセージの実際の送信順序と受信順序を把握し、記録することができます。

解決策

上記の仮定に基づき、次の解決策では、メッセージ順序に関するユーザーの要件を満たし、メッセージが送信された順序で受信、消費されるように設計されています。

手順

  1. 送信者はメッセージを色付けし、メッセージに SeqID (#num# など) を追加します。

  2. 受信者はメッセージを復元し、SeqID 順にソートし、上位層に返します。メッセージが重複して消費されないように、受信したメッセージに対してバックエンドスレッドが提供されます。

  3. 送信者または受信者のエラーによる SeqID の損失を防ぐため、SeqID はローカルディスクファイルに永続的に保存されます(SeqID は、OSS、Table Store、ApsaraDB for RDS など、他のストレージデバイスやデータベースに保存可能です)。

FIFO キュー

プログラムの説明

添付は、Python バージョンの解決策です (Message Service Python SDK を使用)。この中で、OrderedQueueWrapper タイプの oredered_queue.py ファイルは、一般的な Message Service キューを、順序付けされたキューにラップするために提供されます。

OrderedQueueWrapper には、SendMessageInOrder() メソッドと ReceiveMessageInOrder() メソッドがあります。ReceiveMessageInOrder() はメッセージを色付けするために使用され、ReceiveMessageInOrder() はメッセージを復元し、順番に受信者に返すために使用されます。

また、send_message_in_order.py と receive_message_in_order.py は、送信者と受信者が OrderedQueueWrapper を使用するためのサンプルプログラムです。

  1. send_message_in_order.py
  2. #init orderedQueue
  3. seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"} # Specifies the disk file to persistently send the SeqID.
  4. seqIdPS = LocalDiskStorage(seqIdConfig)
  5. orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
  6. orderedQueue.SendMessageInOrder(message)
  7. receive_message_in_order.py
  8. #init orderedQueue
  9. seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} #Specifies the disk file to persistently receive the SeqID.
  10. seqIdPS = LocalDiskStorage(seqIdConfig)
  11. orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
  12. recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)

実行方法

  1. send_message_in_order.py と receive_message_in_order.py で、g_endpoint、g_accessKeyId、g_accessKeySecret、g_testQueueName を設定します。

  2. send_message_in_order.py を実行します。

  3. receive_message_in_order.py を実行します (このステップは、ステップ 2 を完了しなくても実行可能です)。送信者は 20 件のメッセージを送信し、受信者は 20 件のメッセージを順番に受信します。

スクリーンショット 1

oredered_queue.py のテストケースを実行して、順序付けされたキューと、一般的な Message Service キューを比較することもできます (エンドポイントと AK を設定する必要があります)。

コマンド:$python oredered_queue.py

厳密に順序付けされていない:(メッセージ全体が順序付けされますが、一部のメッセージは順序が乱れています。これは、単一の Message Service キュー内に複数のサーバーが同時にサービスを提供することを示します。)

スクリーンショット 2

厳密に順序付けされている:

スクリーンショット 3

注意

  1. このトピックの主な目的は、メッセージの順序付けの解決策を提供することです。このトピックのコードは、厳密なテストを実施していません。このコードをテストせず、本番環境でそのまま使用することは推奨しません。また、このプログラムにはエラーが含まれている可能性があります。訂正がある場合は、ご連絡ください。

  2. 通常の状態では、送信者と受信者の SeqID は、キュー内の (色付き) メッセージの SeqID と一致します。再作成のためにキューを削除する場合、ディスクファイルの SeqID がキュー内の SeqID と一致していることを確認してください。色付きのメッセージを含むキューに色なしのメッセージを送信することは推奨しません。

  3. キュー内のショートメッセージ検証期間と各メッセージの実際の処理結果は、メッセージの順序に影響する場合があります。不規則な順番のメッセージは、プログラムで処理する必要があります。

サンプルプログラムのダウンロード

順序付けされたキュー用の Python サンプルコード