Flink log connector は、Alibaba Cloud Log Service が提供するツールで、 Flink に接続するために使用され、 コンシューマとプロデューサで構成されています。

コンシューマは、 Log Service からデータを読み込みます。 これは、一度の構文とシャードベースのロードバランシングをサポートしています。

プロデューサは Log Service にデータを書き込みます。 コネクタを使用するときは、 Maven 依存関係をプロジェクトに追加する必要があります。

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.3.2</version>
</dependency>
<dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>flink-log-connector</artifactId>
            <version>0.1.7</version>
</dependency>
<dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
</dependency>
 <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log</artifactId>
            <version>0.6.19</version>
 </dependency>
<dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>log-loghub-producer</artifactId>
            <version>0.1.8</version>
</dependency>

前提条件

  1. アクセスキーが有効になり、プロジェクトとログストアが作成されました。 詳しい手順については、準備を参照してください。
  2. サブアカウントを使用してログサービスにアクセスするには、ログストアのリソースアクセス管理(RAM)ポリシーを正しく設定していることを確実にしてください。 詳細は、RAM ユーザーに Log Service へのアクセスを許可を参照してください。

ログコンシューマ

コネクタでは、Flink ログコンシューマは、 Log Service 特定の LogStore に購読する機能を提供し、正確に 1 回の構文を実現します。 使用中に、LogStore のシャード数の変更について心配する必要はありません。

Flink の各サブタスクは、 LogStore でいくつかの断片を消費します。 LogStore 内のシャードが分割またはマージされている場合、サブタスクが消費するシャードはそれに応じて変更されます。

関連API

Flinkログコンシューマは次のAlibaba Cloud Log Service APIを使用します:

  • Getcursorordata

    このAPIは、シャードからデータを引き出すために使用されます。 このAPIが頻繁に呼び出されると、データがLog Serviceのシャードクォータを超える可能性があります。 ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLISおよびConfigConstants.LOG_MAX_NUMBER_PER_FETCHを使用して、API呼び出しの時間間隔および各呼び出しによってプルされるログの数を制御できます。 シャードクォータの詳細については、シャードを参照してください。

    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards

    このAPIは、ログストア内のすべてのシャード、及びシャードステータスのリストを取得するために使用されます。 シャードが常に分割されてマージされている場合は、APIの呼び出し期間を調整して、シャードの時間的な変化を検出できます。

    //30分ごとに ListShard を呼び出します
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000")
  • CreateConsumerGroup

    このAPIは、消費進捗モニタリングが有効になっている場合にのみ呼び出されます。 チェックポイントを同期するためのコンシューマグループを作成するために使用されます。

  • ConsumerGroupUpdateCheckPoint

    このAPIは、Flink のスナップショットを Log ServiceのConsumerGroup に同期させるために使用されます。

ユーザー権限

次の表は、サブユーザーがFlinkログコンシューマを使用するために必要な RAM 権限付与ポリシーを示しています。
API リソース
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

設定手順

1.スタートアップパラメータの設定

Properties configProps = new Properties();
// Set the domain to access Log Service
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// Set the AccessKey
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// Set the Log Service project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// Set the Log Service LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// Set the start position to consume Log Service
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// Set the message deserialization method for Log Service
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(
        new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
上記は簡単な使用例です。 java.util.Propertiesが設定ツールとして使用されているので、すべてのコンシューマの設定はConfigConstantsにあります。
Flink ストリーム内のサブタスクの数は、Log Service LogStore 内のシャードの数とは無関係です。 シャードの数がサブタスクの数よりも多い場合、各サブタスクは複数のシャードを1回だけ消費します。 シャードの数がサブタスクの数より少ない場合、一部のサブタスクは新しいシャードが生成されるまでアイドルです。

2. 消費開始位置の設定

Flink ログコンシューマでシャードを消費するための開始位置を設定できます。 ConfigConstants.LOG_CONSUMER_BEGIN_POSITION を設定することで、シャードをヘッダーまたは末尾から消費するか、特定の時点で消費するかを設定できます。 具体的な値は次のとおりです:

  • Consts.LOG_BEGIN_CURSOR: シャードがそのヘッダー、つまりシャードの最も早いデータから消費されることを示します。
  • Consts.LOG_END_CURSOR: シャードがその末尾、つまりシャードの最新データから消費されることを示します。
  • Constellation S. MAID: 特定のJavaグループから保存されたチェックポイントがconfigconstantsを通じて消費され始めることを示します。 特定のlocergroupを指定してください。
  • UnixTimestamp: 1970-01-01からの秒数で表される整数値の String。 この時点からシャードが消費されたことを示します。
上記の3つの値の例は次のとおりです:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
flinkタスクを開始したときに flink 自体のステートバックエンドからのリカバリーをセットアップした場合、connector は上記の構成を無視し、statebackend に保存されたチェックポイントを使用します。

3. 消費進捗モニタリングの設定(オプション)

Flinkログコンシューマは消費の進捗モニタリングをサポートします。 消費の進行状況は、各シャードのリアルタイムの消費位置を取得することであり、これはタイムスタンプで表されます。 詳細については、コンシューマーグループ - ステータス表示コンシューマーグループ - アラームモニタリングを参照してください。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
上記のコードはオプションです。 設定すると、コンシューマはまずコンシューマグループを作成します。コンシューマグループがすでに存在する場合、それ以上の操作は必要ありません。 コンシューマ内のスナップショットは自動的に Log Service のコンシューマグループに同期されます。 Log Service コンソールでコンシューマの消費状況を確認できます。

4. 耐災害性と 1 回限りの構文をサポート

Flink のチェックポイント機能が有効になっている場合、Flink ログコンシューマは各シャードの消費の進行状況を定期的に保存します。 ジョブが失敗すると、Flink はログコンシューマを再開し、保存されている最新のチェックポイントから消費を開始します。

チェックポイントの書き込み期間は、障害が発生した場合にロールバック(つまり再消費)されるデータの最大量を定義します。 コードは次の通りです。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable the exactly-once syntax on Flink
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Store the checkpoint every 5s
env.enableCheckpointing(5000);

Flink チェックポイントについて詳しくは、Flink の公式文書 Checkpoints を参照してください。

Log Producer

Flink ログプロデューサは、 Alibaba Cloud Log Service データを書き込みます。
プロデューサは少なくとも一度の点で Flink をサポートしています。 つまり、ジョブ障害が発生した場合、 Log Service に書き込まれたデータは複製されますが、失われることはありません。

ユーザー権限

プロデューサは、ログサービスの次のAPIを使用してデータを書き込みます。

  • Log: postlogstorelogs
  • log:ListShards
RAM サブユーザーがプロデューサーを使用する場合は、上記の2つのAPIを許可する必要があります。
API リソース
Log: postlogstorelogs acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/alert/${alarmName}
log:ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/alert/${alarmName}

手順

  1. プロデューサの初期化。
    1. プロデューサの設定パラメータPropertiesを初期化します。
      これは、コンシューマの場合と似ています。 プロデューサにはいくつかのカスタムパラメータがあります。 通常は、これらのパラメータをデフォルト値に設定します。 特別なシナリオで値をカスタマイズできます。
      //データの送信に使用されたI / Oスレッドの数 デフォルト値は 8 です。
      ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
      //ログデータがキャッシュされた時刻。 デフォルト値は3000 です。
      ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
      //キャッシュされたパッケージのログ数 デフォルト値は 4096 です。
      ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
      //キャッシュされたパッケージのサイズ デフォルト値は 3Mb です。
      ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
      //ジョブが使用できる合計メモリサイズ。 デフォルト値は 100Mb です。
      ConfigConstants.LOG_MEM_POOL_BYTES
      上記のパラメータは必須ではありません。 デフォルト値のままでかまいません。
    2. LogSerializationSchema をリロードして、データを RawLogGroup にシリアル化するためのメソッドを定義します。

      RawLogGroup はログのコレクションです。 各フィールドの意味についての詳細は、Data modelを参照してください。

      ログサービスのshardHashKey関数を使用するには、データが書き込まれるシャードを指定します。 Log Partitionerを次のように使用してデータのハッシュキーを生成できます:

      例:
      FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // Generate a 32-bit hash value
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });
      LogPartitioner はオプションです。 このパラメーターが設定されていない場合、データはランダムにシャードに書き込まれます。
  2. 次の使用例では、シミュレーションによって生成された文字列をLog Serviceに書き込みます。
    // Log Serviceのデータ形式にデータをシリアル化する
    class SimpleLogSerializer implements LogSerializationSchema<String> {
        public RawLogGroup serialize(String element) {
            RawLogGroup rlg = new RawLogGroup();
            RawLog rl = new RawLog();
            rl.setTime((int)(System.currentTimeMillis() / 1000));
            rl.addContent("message", element);
            rlg.addLog(rl);
            return rlg;
        }
    }
    public class ProducerSample {
        public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        public static String sAccessKeyId = "";
        public static String sAccessKey = "";
        public static String sProject = "ali-cn-hangzhou-sls-admin";
        public static String sLogstore = "test-flink-producer";
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
            env.setParallelism(3);
            DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
            Properties configProps = new Properties();
            //ログサービスへのアクセスに使用されるドメインの名前を設定します。
            configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
            //ログサービスにアクセスするようにAccessKeyを設定します
            configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
            configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
            //ログが書き込まれるLog Serviceプロジェクトを設定します
            configProps.put(ConfigConstants.LOG_PROJECT, sProject);
            //ログが書き込まれるログサービスLogStoreを設定します
            configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
            FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
            simpleStringStream.addSink(logProducer);
            env.execute("flink log producer");
        }
        //ログ生成をシミュレートする
        public static class EventsGenerator implements SourceFunction<String> {
            private boolean running = true;
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                long seq = 0;
                while (running) {
                    Thread.sleep(1000);
                    ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }