コンシューマーライブラリ は、リアルタイムにデータを読み込む高度なモードで、複数のコンシューマーインスタンスによる Logstore 読み込みは自動的に負荷分散されます。Spark Streaming および Storm は、基本的にコンシューマーグループを使用します。
コンソールに読み込み状況を表示
- Log Service コンソールにログインします。
- プロジェクト一覧ページでプロジェクト名をクリックします。
- 左側のナビゲーションメニューより、 をクリックします。
- コンシューマーグループページで Logstore を選択し、コンシューマーグループ機能が有効かどうかを確認します。
図 1. コンシューマーグループ
- コンシューマーグループの右にあるステータスをクリックすると、各シャードのデータ読み込み状況が表示されます。
図 2. 読み込み状況
上図のとおり、Logstore には 6 つのシャードがあり、3 つのコンシューマーが紐づいています。各コンシューマーがデータを最後に読み込んだ時間は 2 列目に表示されます。データが読み込まれた時間から、データのデータ読み込みがデータ生成に追いついているかどうかがわかります。データ読み込みが大幅に遅れを取っている場合 (つまり、データの生成よりデータの読み込みが遅い場合)、コンシューマーの数を増やすことをお勧めします。
API/SDK より読み込みの進行状況を表示
以下のコマンドは Java SDK の使用例です。API より読み込み状況を取得しています。
package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = "";
static String accesskey = "";
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
//Logstore 内のコンシューマーグループをすべて取得します。コンシューマーグループがない場合、consumerGroups の長さは 0 です。
ArrayList<ConsumerGroup> consumerGroups;
try{
consumerGroups = client.ListConsumerGroup(project, logstore). GetConsumerGroups();
catch(LogException e){
if(e.GetErrorCode() == "LogStoreNotExist")
System.out.println("this logstore does not have any consumer group");
else{
//内部サーバーエラー分岐
return;
for(ConsumerGroup c: consumerGroups){
//Print consumer group properties, including names, heartbeat timeout, and whether or not the consumption is in order.
System.out.println("名前:" + c.getConsumerGroupName());
System.out.println("ハートビートのタイムアウト:" + c.getTimeout());
System.out.println("読み込み順序" + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()). GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
//フォーマットします。正確な時間がミリ秒単位 (Long 型) で返されます。
System.out.println("最終読み込み時間:" + cp.getUpdateTime());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "読み込みはまだ開始されていません。";
else{
//UNIX タイムスタンプ (単位: 秒)。出力前に値を初期化。
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()). GetCursorTime();
consumerPrg = "" + prg;
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "無効。最後の読み込み時間が、Logstore のデータライフサイクルを超えました。";
else{
//内部サーバーエラー
throw e;
System.out.println("読み込みの進行状況:" + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END). GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor). GetCursorTime();
catch(LogException e){
//do nothing
//UNIX タイムスタンプ (単位: 秒)。出力前に値を初期化。
System.out.println("最終データの受け取り時間:" + endPrg);