コンシューマライブラリ

最終更新日: Feb 15, 2018

コンシューマーライブラリは、自動シャード割り当ての高度なモードであり、リアルタイム消費を抽象化して管理する connsumerGroup の概念を提供します。 Spark StreamingStorm を使用した LogHub ログの消耗 は、ConsumerGroupをベースモデルとしています。

基本概念

LogHub コンシューマライブラリには、コンシューマグループ、コンシューマ、ハートビート、チェックポイントの 4 つの重要な概念があります。

  • コンシューマグループ

    コンシューマグループは、ログストアのサブ・リソースです。同じコンシューマグループ名を持つコンシューマは、同じログストアからのデータを使用しますが、各コンシューマが使用するデータは異なります。1 つの Logstore で最大 10 つのコンシューマグループを作成できますが、グループ名はログストア内で一意でなければなりません。同じログストアの下にある異なるコンシューマグループは、データを独立して消費します。

    1. {
    2. "order":boolean,
    3. "timeout": integer
    4. }
    • order: 書き込み時間に基づいて同一のキーを持つデータを順番に使用するかどうかを示します。
    • timeout: コンシューマグループのコンシューマのタイムアウト時間(秒単位)を示します。コンシューマが必要なタイムアウト時間内にハートビートパケットを送信しない場合、コンシューマはタイムアウトしたとみなされ、ログサービスはコンシューマがオフラインになったと判断します。
  • コンシューマ

    各コンシューマには複数のシャードが割り当てられ、これらのシャードのデータを使用することができます。同じコンシューマグループのコンシューマは、一意の名前を持つ必要があります。

  • ハートビート

    コンシューマはキープアライブハートビートパケットを定期的にログサービスに送信する必要があります。

  • チェックポイント

    コンシューマは定期的にシャード使用のエンドポイントを Log Service に保存する必要があります。シャードがあるコンシューマから別のコンシューマに転送された後、ターゲットコンシューマは、ログサービスからシャード使用ブレークポイントを取得し、ブレークポイントからデータを使用します。

構造

それらの関係は以下のように説明されます。

1

シナリオ

LogHub コンシューマライブラリは、複数の LogHub コンシューマが同時にログストアを使用する場合の自動シャード割り当ての高度なモードです。

たとえば、LogHub コンシューマライブラリは、Storm と Spark の複数のコンシューマ間のシャードロードバランシングロジックとコンシューマフェールオーバーを自動的に処理します。シャード割り当て、チェックポイント、フェールオーバーを気にすることなく、ビジネス開発に集中することができます。

たとえば、ストーム・コンピューティング・ストーム(Storm Compute through Storm)では、3 つの使用インスタンス A、B、および C が使用可能になっています。シャードが 10 個ある場合、システムは A と B にそれぞれ3個のシャードを割り当て、C に 4 個のシャードを割り当てます。次に、

  • A がダウンすると、システムは負荷分散モードで A と B、C に使用されなかったデータを割り当てます。A が回復すると、システムはA、B、および C の負荷を均衡させます。
  • D と E という 2 つの使用インスタンスを追加すると、各インスタンスが 2 つのシャードを使用するように負荷分散が実行されます。
  • シャードがマージまたは分割されている場合、システムはロードバランシングを実行して、マージされたシャードまたは分割されたシャードを考慮に入れます。
  • 読み取り専用シャードが使用された後、残りのシャードを考慮してロードバランシングが実行されます。

前のプロセスでデータが失われたり重複したりすることはありません。次のコーディングを完了する必要があります:

  1. 設定パラメータを設定します。
  2. ログ処理コードを記述します。
  3. 使用インスタンスを有効にする。

LogHub コンシューマライブラリを使用すると、ロードバランシング、使用ブレークポイントの保存、順次使用、使用例外処理などの問題を意識することなく、データ処理に集中できます。したがって、データ使用には LogHub コンシューマライブラリが強く推奨されます。

ベストプラクティス:処理 - ConsumerLib の使用

ステータスとアラーム

使用ポイントをリセットする

いくつかのシナリオ(データを入力して計算を繰り返す)では、コンシューマグループポイントを特定の時点に設定して、現在のコンシューマ・グループが新しいポイントから使用を開始できるようにする必要があります。2 つの方法があります。

  1. コンシューマグループを削除する
    • コンソールでコンシューマグループを削除し、コンシューマグループプログラムを再起動する。
    • コンシューマグループのプログラムは、デフォルトの開始点から使用を開始する(プログラムによって設定される)。
  2. SDK を使用して、現在のコンシューマ・グループを特定の時点にリセットする。

    • プログラムと Java コードの例は次のとおりです。

      1. Client client = new Client(host, accessId, accessKey);
      2. long time_stamp = Timestamp.valueOf("2017-08-15 00:00:00").getTime() / 1000;
      3. ListShardResponse shard_res = client.ListShard(new ListShardRequest(project, logStore));
      4. ArrayList<Shard> all_shards = shard_res.GetShards();
      5. for (Shard shard: all_shards)
      6. {
      7. shardId = shard.GetShardId();
      8. long cursor_time = time_stamp;
      9. String cursor = client.GetCursor(project, logStore, shardId, cursor_time).GetCursor();
      10. client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
      11. }

使用上のガイドライン

Spark StreamingStorm を使用した LogHub ログの消耗 では、ConsumerGroupをベースモデルとしています。これらのクライアントは、コンシューマライブラリのすべての機能をサポートしています。

Java:コンシューマライブラリのインタフェースに基づいて

  1. LogHub コンシューマライブラリの次の 2 つのインターフェイスクラスを実装します。

    • ILogHubProcessor:各シャードはインスタンスに対応し、各インスタンスは特定のシャードのデータを使用します。
    • ILogHubProcessorFactory: ILogHubProcessor インターフェイスを実装するインスタンスを生成します。
  2. パラメータを設定します。

  3. 1 つ以上の ClientWorker インスタンスを有効にします。

Maven アドレス

  1. <dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>2.5.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.openservices</groupId>
  8. <artifactId>aliyun-log</artifactId>
  9. <version>0.6.11</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.aliyun.openservices</groupId>
  13. <artifactId>loghub-client-lib</artifactId>
  14. <version>0.6.15</version>
  15. </dependency>

main 関数

  1. public static void main(String args[])
  2. {
  3. LogHubConfig config = new LogHubConfig(...);
  4. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
  5. Thread thread = new Thread(worker);
  6. //The ClientWorker instance runs automatically after the thread is executed and extends the Runnable interface.
  7. thread.start();
  8. Thread.sleep(60 * 60 * 1000);
  9. //The shutdown function of the ClientWorker instance is called to exit the consumption instance. The associated thread is stopped automatically.
  10. worker.shutdown();
  11. //Multiple asynchronous tasks are generated when the ClientWorker instance is running. You are advised to wait 30s until all tasks are exited after shutdown.
  12. Thread.sleep(30 * 1000);
  13. }

ILogHubProcessor および ILogHubProcessorFactory 実装サンプル

ILogHubProcessor は、特定のシャードに対応する使用インスタンスを示します。開発プロセス中のデータ使用ロジックに注意してください。同じ ClientWorker インスタンスはデータをシリアルに使用し、1 つの ILogHubProcessor インスタンスのみが生成されます。ClientWorker インスタンスは、終了時に ILogHubProcessor の shutdown 関数を呼び出します。

  1. public class SampleLogHubProcessor implements ILogHubProcessor
  2. {
  3. private int mShardId;
  4. // Records the last persistent checkpoint time.
  5. private long mLastCheckTime = 0;
  6. public void initialize(int shardId)
  7. {
  8. mShardId = shardId;
  9. }
  10. // Master logic of data consumption
  11. public String process(List<LogGroupData> logGroups,
  12. ILogHubCheckPointTracker checkPointTracker)
  13. {
  14. for(LogGroupData logGroup: logGroups){
  15. FastLogGroup flg = logGroup.GetFastLogGroup();
  16. System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
  17. flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
  18. System.out.println("Tags");
  19. for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
  20. FastLogTag logtag = flg.getLogTags(tagIdx);
  21. System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
  22. }
  23. for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
  24. FastLog log = flg.getLogs(lIdx);
  25. System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
  26. for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
  27. FastLogContent content = log.getContents(cIdx);
  28. System.out.println(content.getKey() + "\t:\t" + content.getValue());
  29. }
  30. }
  31. }
  32. long curTime = System.currentTimeMillis();
  33. // Writes checkpoints to the Log Service every 60s. If a ClientWorker instance crashes during the 60s period,
  34. // the new ClientWorker instance consumes data starting from the last checkpoint. Duplicate data may exist.
  35. if (curTime - mLastCheckTime > 60 * 1000)
  36. {
  37. try
  38. {
  39. //When the parameter is set to true, checkpoints are immediately updated to Log Service. When the parameter is set to false, checkpoints are locally cached. The default update interval is 60s.
  40. //The background updates checkpoints to Log Service.
  41. checkPointTracker.saveCheckPoint(true);
  42. }
  43. catch (LogHubCheckPointException e)
  44. {
  45. e.printStackTrace();
  46. }
  47. mLastCheckTime = curTime;
  48. }
  49. else
  50. {
  51. try
  52. {
  53. checkPointTracker.saveCheckPoint(false);
  54. }
  55. catch (LogHubCheckPointException e)
  56. {
  57. e.printStackTrace();
  58. }
  59. }
  60. // "null" indicates that data is properly processed. If you need to roll back to the last checkpoint for retry, you can return checkPointTracker.getCheckpoint().
  61. return null;
  62. }
  63. // The ClientWorker instance calls this function upon exit, during which you can perform cleanup.
  64. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
  65. {
  66. //Saves the consumption breakpoint to Log Service.
  67. try {
  68. checkPointTracker.saveCheckPoint(true);
  69. } catch (LogHubCheckPointException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }

ILogHubProcessorFactory は、ILogHubProcessor を生成するために使用されます。

  1. public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
  2. {
  3. public ILogHubProcessor generatorProcessor()
  4. {
  5. // Generates a consumption instance.
  6. return new SampleLogHubProcessor();
  7. }
  8. }

instructions を設定する

  1. public class LogHubConfig
  2. {
  3. //Default interval when the ClientWorker instance pulls data.
  4. public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
  5. //Name of a consumer group. The name can be 3 to 63 characters in length, and can include English letters, digits, underscores (_), and hyphens (-). It cannot be null and must begin and end with lowercase letters or numbers.
  6. private String mConsumerGroupName;
  7. //Name of a consumer. Consumers in the same consumer group must have unique names.
  8. private String mWorkerInstanceName;
  9. //Address of the LogHub data interface.
  10. private String mLogHubEndPoint;
  11. //Project name
  12. private String mProject;
  13. //LogStore name
  14. private String mLogStore;
  15. //Access key ID of the cloud account.
  16. private String mAccessId;
  17. //Access key of the cloud account.
  18. private String mAccessKey;
  19. //Indicates the consumption start point of a shard when the shard's checkpoint is not recorded in Log Service. The value does not take effect if Log Service records valid checkpoint information. mCursorPosition can be set to BEGIN_CURSOR, END_CURSOR, or SPECIAL_TIMER_CURSOR. BEGIN_CURSOR indicates that consumption starts from the first data entry of the shard. END_CURSOR indicates that consumption starts from the last data entry at the current time. SPECIAL_TIMER_CURSOR is used in conjunction with mLoghubCursorStartTime to indicate a specific data consumption start time.
  20. private LogHubCursorPosition mCursorPosition;
  21. //When mCursorPosition is set to SPECIAL_TIMER_CURSOR, data consumption will start at a specific time measured in seconds.
  22. private int mLoghubCursorStartTime = 0;
  23. // The interval (measured in milliseconds) of LogHub data acquisition by polling. The smaller the interval, the faster data is extracted. The default value is DEFAULT_DATA_FETCH_INTERVAL_MS. It is recommended to set the interval to a value greater than 200 ms.
  24. private long mDataFetchIntervalMillis;
  25. // The interval (measured in milliseconds) when the ClientWorker instance sends heartbeat packets to Log Service. The recommended value is 10,000 ms.
  26. private long mHeartBeatIntervalMillis;
  27. //Sequential consumption or not.
  28. private boolean mConsumeInOrder;
  29. }

使用上の注意

  • LogHubConfig のコンシューマグループ名は、コンシューマグループを示します。同じコンシューマグループ名を持つコンシューマは、ワーカーインスタンス名で区別されます。彼らは同じログストアのシャードを使用します。

    1. Assume that a LogStore has four shards numbered from 0 to 3.
    2. There are three ClientWorker instances with the following consumerGroupName and workerinstance name settings:
    3. <consumer_group_name_1 , worker_A>
    4. <consumer_group_name_1 , worker_B>
    5. <consumer_group_name_2 , worker_C>
    6. The ClientWorker instances are allocated with the following shards:
    7. <consumer_group_name_1 , worker_A>: shard_0, shard_1
    8. <consumer_group_name_1 , worker_B>: shard_2, shard_3
    9. <consumer_group_name_2 , worker_C>: Shard_0, shard_1, shard_2, shard_3 # The ClientWorker instances with different consumer group names consume data independently.
  • ILogHubProcessor process ()インターフェイスが正しく実装され、終了していることを確認します。

  • ILogHubCheckPointTrackerのsaveCheckPoint ()インターフェイスは、転送されたパラメータが true または false に設定されているかどうかにかかわらず、データ処理が完了したことを示します。このパラメータを true に設定すると、チェックポイントの永続性がログサービスですぐに実装されます。このパラメータを false に設定すると、チェックポイントは 60 秒ごとにログサービスに同期されます。

  • RAM 認証は、サブアカウントのアクセスキー ID とアクセスキーシークレットが LogHubConfig で設定されている場合に必要です。

アクション リソース
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

Python:コンシューマライブラリのインタフェースに基づいて開発されました