Flink log connector は、Alibaba Cloud Log Service が提供するツールで、 Flink に接続するために使用され、 コンシューマとプロデューサで構成されています。
コンシューマは、 Log Service からデータを読み込みます。 これは、一度の構文とシャードベースのロードバランシングをサポートしています。
プロデューサは Log Service にデータを書き込みます。 コネクタを使用するときは、 Maven 依存関係をプロジェクトに追加する必要があります。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.7</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.19</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>log-loghub-producer</artifactId>
<version>0.1.8</version>
</dependency>
前提条件
- アクセスキーが有効になり、プロジェクトとログストアが作成されました。 詳しい手順については、準備を参照してください。
- サブアカウントを使用してログサービスにアクセスするには、ログストアのリソースアクセス管理(RAM)ポリシーを正しく設定していることを確実にしてください。 詳細は、RAM ユーザーに Log Service へのアクセスを許可を参照してください。
ログコンシューマ
コネクタでは、Flink ログコンシューマは、 Log Service 特定の LogStore に購読する機能を提供し、正確に 1 回の構文を実現します。 使用中に、LogStore のシャード数の変更について心配する必要はありません。
Flink の各サブタスクは、 LogStore でいくつかの断片を消費します。 LogStore 内のシャードが分割またはマージされている場合、サブタスクが消費するシャードはそれに応じて変更されます。
関連API
Flinkログコンシューマは次のAlibaba Cloud Log Service APIを使用します:
-
Getcursorordata
このAPIは、シャードからデータを引き出すために使用されます。 このAPIが頻繁に呼び出されると、データがLog Serviceのシャードクォータを超える可能性があります。 ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLISおよびConfigConstants.LOG_MAX_NUMBER_PER_FETCHを使用して、API呼び出しの時間間隔および各呼び出しによってプルされるログの数を制御できます。 シャードクォータの詳細については、シャードを参照してください。
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
-
ListShards
このAPIは、ログストア内のすべてのシャード、及びシャードステータスのリストを取得するために使用されます。 シャードが常に分割されてマージされている場合は、APIの呼び出し期間を調整して、シャードの時間的な変化を検出できます。
//30分ごとに ListShard を呼び出します configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000")
-
CreateConsumerGroup
このAPIは、消費進捗モニタリングが有効になっている場合にのみ呼び出されます。 チェックポイントを同期するためのコンシューマグループを作成するために使用されます。
-
ConsumerGroupUpdateCheckPoint
このAPIは、Flink のスナップショットを Log ServiceのConsumerGroup に同期させるために使用されます。
ユーザー権限
API | リソース |
---|---|
log:GetCursorOrData | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:ListShards | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
設定手順
1.スタートアップパラメータの設定
Properties configProps = new Properties();
// Set the domain to access Log Service
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// Set the AccessKey
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// Set the Log Service project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// Set the Log Service LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// Set the start position to consume Log Service
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// Set the message deserialization method for Log Service
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(
new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
2. 消費開始位置の設定
Flink ログコンシューマでシャードを消費するための開始位置を設定できます。 ConfigConstants.LOG_CONSUMER_BEGIN_POSITION を設定することで、シャードをヘッダーまたは末尾から消費するか、特定の時点で消費するかを設定できます。 具体的な値は次のとおりです:
- Consts.LOG_BEGIN_CURSOR: シャードがそのヘッダー、つまりシャードの最も早いデータから消費されることを示します。
- Consts.LOG_END_CURSOR: シャードがその末尾、つまりシャードの最新データから消費されることを示します。
- Constellation S. MAID: 特定のJavaグループから保存されたチェックポイントがconfigconstantsを通じて消費され始めることを示します。 特定のlocergroupを指定してください。
- UnixTimestamp: 1970-01-01からの秒数で表される整数値の String。 この時点からシャードが消費されたことを示します。
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
3. 消費進捗モニタリングの設定(オプション)
Flinkログコンシューマは消費の進捗モニタリングをサポートします。 消費の進行状況は、各シャードのリアルタイムの消費位置を取得することであり、これはタイムスタンプで表されます。 詳細については、コンシューマーグループ - ステータス表示とコンシューマーグループ - アラームモニタリングを参照してください。
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
4. 耐災害性と 1 回限りの構文をサポート
Flink のチェックポイント機能が有効になっている場合、Flink ログコンシューマは各シャードの消費の進行状況を定期的に保存します。 ジョブが失敗すると、Flink はログコンシューマを再開し、保存されている最新のチェックポイントから消費を開始します。
チェックポイントの書き込み期間は、障害が発生した場合にロールバック(つまり再消費)されるデータの最大量を定義します。 コードは次の通りです。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable the exactly-once syntax on Flink
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Store the checkpoint every 5s
env.enableCheckpointing(5000);
Flink チェックポイントについて詳しくは、Flink の公式文書 Checkpoints を参照してください。
Log Producer
ユーザー権限
プロデューサは、ログサービスの次のAPIを使用してデータを書き込みます。
- Log: postlogstorelogs
- log:ListShards
API | リソース |
---|---|
Log: postlogstorelogs | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/alert/${alarmName} |
log:ListShards | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/alert/${alarmName} |
手順
- プロデューサの初期化。
- プロデューサの設定パラメータPropertiesを初期化します。
これは、コンシューマの場合と似ています。 プロデューサにはいくつかのカスタムパラメータがあります。 通常は、これらのパラメータをデフォルト値に設定します。 特別なシナリオで値をカスタマイズできます。
上記のパラメータは必須ではありません。 デフォルト値のままでかまいません。//データの送信に使用されたI / Oスレッドの数 デフォルト値は 8 です。 ConfigConstants.LOG_SENDER_IO_THREAD_COUNT //ログデータがキャッシュされた時刻。 デフォルト値は3000 です。 ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS //キャッシュされたパッケージのログ数 デフォルト値は 4096 です。 ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE //キャッシュされたパッケージのサイズ デフォルト値は 3Mb です。 ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE //ジョブが使用できる合計メモリサイズ。 デフォルト値は 100Mb です。 ConfigConstants.LOG_MEM_POOL_BYTES
- LogSerializationSchema をリロードして、データを RawLogGroup にシリアル化するためのメソッドを定義します。
RawLogGroup はログのコレクションです。 各フィールドの意味についての詳細は、Data modelを参照してください。
ログサービスのshardHashKey関数を使用するには、データが書き込まれるシャードを指定します。 Log Partitionerを次のように使用してデータのハッシュキーを生成できます:
例:FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { // Generate a 32-bit hash value public String getHashKey(String element) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); String hash = new BigInteger(1, md.digest()).toString(16); while(hash.length() < 32) hash = "0" + hash; return hash; } catch (NoSuchAlgorithmException e) { } return "0000000000000000000000000000000000000000000000000000000000000000"; } });
注 LogPartitioner はオプションです。 このパラメーターが設定されていない場合、データはランダムにシャードに書き込まれます。
- プロデューサの設定パラメータPropertiesを初期化します。
- 次の使用例では、シミュレーションによって生成された文字列をLog Serviceに書き込みます。
// Log Serviceのデータ形式にデータをシリアル化する class SimpleLogSerializer implements LogSerializationSchema<String> { public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; public static String sAccessKeyId = ""; public static String sAccessKey = ""; public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); //ログサービスへのアクセスに使用されるドメインの名前を設定します。 configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); //ログサービスにアクセスするようにAccessKeyを設定します configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); //ログが書き込まれるLog Serviceプロジェクトを設定します configProps.put(ConfigConstants.LOG_PROJECT, sProject); //ログが書き込まれるログサービスLogStoreを設定します configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } //ログ生成をシミュレートする public static class EventsGenerator implements SourceFunction<String> { private boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(1000); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }