Stormを使用したLogHubログの消耗

最終更新日: Oct 11, 2017

ログサービスのLogHubは、LogtailおよびSDKを介してログデータを収集するための効率的で信頼性の高いログチャネルを提供します。Spark StreamingやStormなどのリアルタイムシステムにアクセスして、LogHubに書き込まれたデータを使用することができます。

LogHub Storm spout機能は、LogHubからリアルタイムでデータを読み取り、StormユーザーのLogHub使用コストを削減します。

基本的なアーキテクチャとプロセス

1

  • 上の図では、LogHub Stormスパウトは赤い点線の囲みで囲まれています。各Stormトポロジには、ログストアからデータを読み取るスパウトのグループがあります。異なるトポロジーのスパウトは互いに独立しています。
  • 各トポロジは、一意のLogHubコンシューマグループ名によって識別されます。LogHubクライアントライブラリは、同じトポロジ内のスパウト間のロードバランシングと自動フェイルオーバーに使用されます。
  • スパウトは、リアルタイムでLogHubからデータを読み取り、トポロジーのボルトノードにデータを送信し、定期的に使用点をチェックポイントとしてLogHubサーバーに保存します。

注意:

  • 誤使用を防ぐため、各ログストアは最大5つのコンシューマグループをサポートしています。Java SDKのDeleteConsumerGroupインタフェースを使用して、未使用のコンシューマグループを削除できます。
  • スパウトとシャードの数は同数にすることをお勧めします。そうしないと、単一のスパウトが大量のデータを処理できないことがあります。
  • シャードに単一スパウトの処理能力を超える大量のデータが含まれている場合は、シャード分割インターフェースを使用してシャード単位のデータ量を減らすことができます。
  • スパムが正しくボルトにメッセージを送信することを確認するには、Storm ACKメカニズムの依存関係がLogHubスパウトで必須です。したがって、ボルトはそのような確認のためにACKを呼び出さなければなりません。

スパウト(トポロジの作成に使用)

  1. public static void main( String[] args )
  2. {
  3. String mode = "Local"; // Uses the local test mode.
  4. String conumser_group_name = ""; // Each topology must be assigned a unique consumer group name. The name can be 3 to 63 characters in length, and can include English letters, digits, underscores (_), and hyphens (-). It cannot be null and must begin and end with lowercase letters or numbers.
  5. String project = ""; // Project of the Log Service
  6. String logstore = ""; // LogStore of the Log Service
  7. String endpoint = ""; // Domain of the Log Service
  8. String access_id = ""; // User's access key
  9. String access_key = "";
  10. // Configurations required for creating a LogHub Storm spout.
  11. LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
  12. endpoint, project, logstore, access_id,
  13. access_key, LogHubCursorPosition.END_CURSOR);
  14. TopologyBuilder builder = new TopologyBuilder();
  15. // Creates a LogHub Storm spout.
  16. LogHubSpout spout = new LogHubSpout(config);
  17. // In the actual condition, the number of spouts may be equal to the number of LogStore shards.
  18. builder.setSpout("spout", spout, 1);
  19. builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
  20. Config conf = new Config();
  21. conf.setDebug(false);
  22. conf.setMaxSpoutPending(1);
  23. // The serialization method LogGroupDataSerializSerializer of LogGroupData must be configured explicitly when Kryo is used for data serialization and deserialization.
  24. Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
  25. if (mode.equals("Local")) {
  26. logger.info("Local mode...");
  27. LocalCluster cluster = new LocalCluster();
  28. cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
  29. try {
  30. Thread.sleep(6000 * 1000); //waiting for several minutes
  31. } catch (InterruptedException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. cluster.killTopology("test-jstorm-spout");
  36. cluster.shutdown();
  37. } else if (mode.equals("Remote")) {
  38. logger.info("Remote mode...");
  39. conf.setNumWorkers(2);
  40. try {
  41. StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
  42. } catch (AlreadyAliveException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. } catch (InvalidTopologyException e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }
  49. } else {
  50. logger.error("invalid mode: " + mode);
  51. }
  52. }
  53. }

データを使用するボルトのサンプルコード(各ログの内容のみが出力されます)

  1. public class SampleBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 4752656887774402264L;
  3. private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
  4. private OutputCollector mCollector;
  5. @Override
  6. public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
  7. OutputCollector collector) {
  8. mCollector = collector;
  9. }
  10. @Override
  11. public void execute(Tuple tuple) {
  12. String shardId = (String) tuple
  13. .getValueByField(LogHubSpout.FIELD_SHARD_ID);
  14. @SuppressWarnings("unchecked")
  15. List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
  16. for (LogGroupData groupData : logGroupDatas) {
  17. // Each LogGroup consists of one or more logs.
  18. LogGroup logGroup = groupData.GetLogGroup();
  19. for (Log log : logGroup.getLogsList()) {
  20. StringBuilder sb = new StringBuilder();
  21. // Each log has a time field and multiple key–value pairs.
  22. int log_time = log.getTime();
  23. sb.append("LogTime:").append(log_time);
  24. for (Content content : log.getContentsList()) {
  25. sb.append("\t").append(content.getKey()).append(":")
  26. .append(content.getValue());
  27. }
  28. logger.info(sb.toString());
  29. }
  30. }
  31. // The dependency on the Storm ACK mechanism is mandatory in LogHub spouts to confirm that spouts correctly send messages
  32. // to bolts. Therefore, bolts must call ACK for such confirmation.
  33. mCollector.ack(tuple);
  34. }
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. //do nothing
  38. }
  39. }

Maven

Storm 1.0より前のバージョン(たとえば、0.9.6)では、次のコードを使用します。

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-spout</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

Storm 1.0以降のバージョンでは、次のコードを使用してください。

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-1.0-spout</artifactId>
  4. <version>0.1.2</version>
  5. </dependency>