コンシューマーライブラリは、Log Service にログを読み込むための高度なモードです。コンシューマーグループ概念に基づき、コンシューマーをまとめて管理します。SDK を使用して直接データを読み取るときのように、Log Service 実装の知識は必要はありません。また、コンシューマー間の負荷分散、フェイルオーバーといったことも気にする必要がありません。コンシューマーライブラリを使用することで、サービスロジックに専念できます。
Spark Streaming、Storm および Flink Connector の実装にコンシューマーライブラリを使用します。
基本概念
- コンシューマーグループ
コンシューマーグループは複数のコンシューマーで構成されます。コンシューマーグループ内のコンシューマーは、同じ Logstore のデータを読み込みます。コンシューマーごとに別々のデータが読み込まれます。
- コンシューマー
コンシューマーは、データを読み込む、コンシューマーグループを構成する単位です。コンシューマー名は、コンシューマーグループ内で一意である必要があります。
- 各シャードに割り当てることのできるコンシューマーは 1 つのみ
- 各コンシューマーには、複数のシャードを紐づけることができる
コンシューマーグループに新たにコンシューマーを追加すると、読み込みが負荷分散されるようコンシューマーグループに紐づいているシャードは調整されます。 ただし、上記の割り当てルールは変わりません。割り当て処理はユーザーには見えません。
コンシューマーライブラリにもチェックポイントを保存することができます。したがって、コンシューマーはプログラムの例外が解決された後にブレークポイントから始まるデータを読み込み、データが一度だけ読み込まれるようにすることができます。
使用法
Maven ライブラリを追加
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.16</version>
</dependency>
public class Main {
// Log Service のドメイン名を指定
private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
// Log Service のプロジェクト名を指定
private static String sProject = "ali-cn-hangzhou-sls-admin";
// Log Service の Logstore 名を指定
private static String sLogstore = "sls_operation_log";
// コンシューマーグループ名を指定
private static String sConsumerGroup = "consumerGroupX";
// データ読み込みの AccessKey を指定
private static String sAccessKeyId = "";
private static String sAccessKey = "";
public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
{
// 2 つ目のパラメータはコンシューマー名です。コンシューマーグループ内のコンシューマー名はコンシューマーグループ内で一意である必要があります。ただし、コンシューマーグループ名は重複しても構いません。各コンシューマーは複数のマシンの複数の処理を負荷分散方式で読み込み開始します。 コンシューマー名はマシンの IP アドレス順に並べられます。9 番目には、maxFetchLogGroupSize パラメータを指定します (maxFetchLogGroupSize は、Log Service が 1 回に取得する Logstore の数ですが、初期値をそのままご使用ください。変更する場合には、1 以上 1000 以下にします)。
LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//スレッドの実行が開始し、実行可能な API を extend すると ClientWorker が自動的に起動されます。
thread.start();
Thread.sleep(60 * 60 * 1000);
//worker の Shutdown 関数を呼び出して読み込みインスタンスを 終了します。紐づいているスレッドは自動停止します。
worker.shutdown();
//ClientWorker 実行中に複数の非同期タスクが生成されます。シャットダウンしたら、実行タスクが終了するまで 30 秒待機することを推奨します。
Thread.sleep(30 * 1000);
}
}
public class SampleLogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 最後に残ったチェックポイント時間を記録します。
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// データ読み込みの主要部。例外はすべてキャッチされますが、キャッチされた例外は返されません。
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker)
{
// Write checkpoint to30 秒ごとに Log Service にチェックポイントを書き込みます。30 秒以内に worker がクラッシュすると、新たにクラッシュすると、新たに開始した worker が最終チェックポイントよりデータの読み込みを開始します。わずかに重複データが含まれる場合もあります。
for(LogGroupData logGroup: logGroups){
FastLogGroup flg = logGroup.GetFastLogGroup();
System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
System.out.println("Tags");
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
FastLog log = flg.getLogs(lIdx);
System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 30 秒ごとに Log Service にチェックポイントを書き込みます。30 秒以内に worker がクラッシュする場合、
// 新たに起動した worker が最終チェックポイントから、データ読み込みを開始します。わずかに重複データが含まれる可能性があります。
if (curTime - mLastCheckTime > 30 * 1000)
{
try
{
//パラメータが true の場合、Log Service のチェックポイントが更新されます。パラメータが false の場合、チェックポイントはローカルマシンにキャッシュされ、Log Service にデフォルトで 60 秒ごとに Log Service のチェックポイントが更新されます。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
return null;
}
// 終了する際、worker は本関数を呼び出します。ここできれいにします (cleanup)。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
//Log Service の読み込み中断位置の保存
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 読み込みインスタンスの生成
return new SampleLogHubProcessor();
}
}
上記のコードを実行して、すべてのデータを Logstore に書き込みます。1 つの Logstore を複数のコンシューマーで読み込むことのできるよう、コード内の説明に従ってプログラムを書き換え、同じコンシューマーグループ名と異なるコンシューマー名を使用して、読み込み処理を実行します。
制限と例外
各 Logstore には、コンシューマーグループを最大 10 個作成できます。上限を超えると、ConsumerGroupQuotaExceed
エラーが報告されます。
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
ステータスとアラーム
高度な構成
通常は、上記のプログラムでデータを読み込みますが、高度な設定は以下のとおりです。
-
特定の時間に始まるデータを読み込む
上記のコードの LogHubConfig には、コンストラクタが 2 つあります。// consumerStartTimeInSeconds パラメータは、データが読み込まれてからの時間を1970以降の秒数を示します。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // Position is an enumeration variable, loghubconfig.glaseposition.begin_cursor は、最も古いデータから読み込みを開始することを示します。loghubconfig.glaseposition.end_cursor は、最新のデータから読み込みが開始することを示します。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);
コンシューマーは必要に応じてさまざまに構築できますが、サーバーがチェックポイントで保存されている場合、読み込み開始位置はサーバーに保存されているチェックポイントに基づきます。
-
RAM を利用した Log Service へのアクセス
コンシューマーグループに関連する RAM 許可を設定し、RAM のドキュメントを参照するようにメソッドを設定する必要があります。設定する必要がある権限は以下の通りです。
操作 | リソース |
---|---|
log:GetCursorOrData | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ListConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
-
読み込み位置をリセットする
いくつかのシナリオ (データの入力、繰り返し処理) では、ConsumerGroup ポイントを特定の時点に設定して、現在のコンシューマーグループが新しい時点から読み込まれるようにする必要があります。 2 つの方法があります。
- コンシューマーグループを削除する
- コンソールでコンシューマーグループを削除し、コンシューマーグループプログラムを再起動します。
- コンシューマーグループ・プログラムは、デフォルトの開始点から読み込みを開始します(プログラムによって構成されます)。
- SDK を使用して現在のコンシューマーグループを特定の時点にリセットする
- Java のコード例は次のとおりです。
- SDK を使用してサイトを変更してからコンシューマープログラムを再起動します。
Client client = new Client(host, accessId, accessKey); long time_stamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse shard_res = client.ListShard(new ListShardRequest(project, logStore)); ArrayList<Shard> all_shards = shard_res.GetShards(); for (Shard shard: all_shards) { shardId = shard.GetShardId(); long cursor_time = time_stamp; String cursor = client.GetCursor(project, logStore, shardId, cursor_time). GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); }
- コンシューマーグループを削除する