コンシューマーライブラリ は、リアルタイムにデータを読み込む高度なモードで、複数のコンシューマーインスタンスによる Logstore 読み込みは自動的に負荷分散されます。Spark Streaming および Storm は、基本的にコンシューマーグループを使用します。

コンソールに読み込み状況を表示

  1. Log Service コンソールにログインします。
  2. プロジェクト一覧ページでプロジェクト名をクリックします。
  3. 左側のナビゲーションメニューより、LogHub - コンシューマー> > コンシューマーグループをクリックします。
  4. コンシューマーグループページで Logstore を選択し、コンシューマーグループ機能が有効かどうかを確認します。
    図 1. コンシューマーグループ


  5. コンシューマーグループの右にあるステータスをクリックすると、各シャードのデータ読み込み状況が表示されます。
    図 2. 読み込み状況


    上図のとおり、Logstore には 6 つのシャードがあり、3 つのコンシューマーが紐づいています。各コンシューマーがデータを最後に読み込んだ時間は 2 列目に表示されます。データが読み込まれた時間から、データのデータ読み込みがデータ生成に追いついているかどうかがわかります。データ読み込みが大幅に遅れを取っている場合 (つまり、データの生成よりデータの読み込みが遅い場合)、コンシューマーの数を増やすことをお勧めします。

API/SDK より読み込みの進行状況を表示

以下のコマンドは Java SDK の使用例です。API より読み込み状況を取得しています。

package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = "";
    static String accesskey = "";
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        //Logstore 内のコンシューマーグループをすべて取得します。コンシューマーグループがない場合、consumerGroups の長さは 0 です。
        ArrayList<ConsumerGroup> consumerGroups;
        try{
            consumerGroups = client.ListConsumerGroup(project, logstore). GetConsumerGroups();
        
        catch(LogException e){
            if(e.GetErrorCode() == "LogStoreNotExist")
                System.out.println("this logstore does not have any consumer group");
            else{
                //内部サーバーエラー分岐
            
            return;
        
        for(ConsumerGroup c: consumerGroups){
            //Print consumer group properties, including names, heartbeat timeout, and whether or not the consumption is in order.
                       System.out.println("名前:" + c.getConsumerGroupName());
            System.out.println("ハートビートのタイムアウト:" + c.getTimeout());
            System.out.println("読み込み順序" + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()). GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                //フォーマットします。正確な時間がミリ秒単位 (Long 型) で返されます。
                System.out.println("最終読み込み時間:" + cp.getUpdateTime());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "読み込みはまだ開始されていません。";
                else{
                    //UNIX タイムスタンプ (単位: 秒)。出力前に値を初期化。
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()). GetCursorTime();
                        consumerPrg = "" + prg;
                    
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "無効。最後の読み込み時間が、Logstore のデータライフサイクルを超えました。";
                        else{
                            //内部サーバーエラー
                            throw e;
                        
                    
                
                System.out.println("読み込みの進行状況:" + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END). GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor). GetCursorTime();
                
                catch(LogException e){
                    //do nothing
                
                //UNIX タイムスタンプ (単位: 秒)。出力前に値を初期化。
                                System.out.println("最終データの受け取り時間:" + endPrg);