このドキュメントでは、Alibaba Cloud Realtime Compute を使用してデータを処理し、データを Alibaba Cloud Elasticsearch (ES) にインポートする方法について説明します。

始める前に

このタスクについて

Alibaba Cloud Realtime Compute は、Alibaba Cloud Flink プロダクトです。 Kafka や Elasticsearch など、複数のデータソースとデータ消費サービスをサポートしています。DDL の概要 Alibaba Cloud Realtime Compute および Elasticsearch を使用して、ログ取得シナリオのビジネス要件を満たすことができます。

Kafka または Log Service のログは、単純または複雑な Flink SQL 文を使用して Realtime Compute によって処理され、検索サービスのソースデータとして Alibaba Cloud Elasticsearch にインポートされます。 Realtime Compute の強力なコンピューティング機能と Alibaba Cloud Elasticsearch の検索機能により、リアルタイムのデータ処理と検索を実現し、ビジネスをリアルタイムサービスに変換できます。 さらに、Realtime Compute は Alibaba Cloud Elasticsearch と簡単に連携できます。 次の例は、Realtime Compute を Alibaba Cloud Elasticsearch と連携する方法を示しています。

たとえば、ログまたはデータが Log Service にインポートされ、Alibaba Cloud Elasticsearch にインポートする前にデータを処理する必要があると仮定します。 次の図は、データ消費のパイプラインを示しています。Realtime Compute と Alibaba Cloud Elasticsearch の連携

Realtime Compute ジョブを作成する

  1. Realtime Compute コンソール にログインし、ジョブを作成します。
  2. Flink SQL 文を記述します。
    1. Log Service テーブルを作成します。
      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );
      WITH 変数の説明は次のとおりです。
      変数 説明
      <yourEndpoint> Alibaba Cloud Log Service のパブリックエンドポイント。 エンドポイントは、Log Service のプロジェクトや内部ログデータにアクセスするために使用される URL です。 詳細は、「サービスエンドポイント」をご参照ください。

      たとえば、中国 (杭州) の Log Service のエンドポイントはhttp://cn-hangzhou.log.aliyuncs.com です。 エンドポイントが http:// で始まることを確認してください。

      <yourAccessId> Log Service へのアクセスに使用される AccessKey ID。
      <yourAccessKey> Log Service へのアクセスに使用される AccessKey シークレット。
      <yourStartTime> ログデータが消費される時間範囲の開始時間。 Realtime Compute ジョブを実行するときは、この変数で指定された開始時間よりも早い時間を指定する必要があります。
      <yourProjectName> Log Service プロジェクトの名前。
      <yourLogStoreName> プロジェクト内の Logstore の名前。
      <yourConsumerGroupName> Log Service コンシューマグループの名前。

      WITH 変数の詳細については、「Log Service テーブルの作成」をご参照ください。

    2. Elasticsearch 結果テーブルを作成します。
      重要
      • Realtime Compute V3.2.2 以降のバージョンでは、Elasticsearch 結果テーブルの対応が追加されています。 Realtime Compute ジョブを作成するときに、正しいバージョンを選択してください。
      • Elasticsearch の結果テーブルは RESTful API に基づいています。 したがって、すべての Elasticsearch バージョンと互換性があります。
      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );
      WITH 変数の説明は次のとおりです。
      変数 説明
      <Instances> Elasticsearch インスタンスの ID。 インスタンスの [基本情報] ページでインスタンス ID を確認できます。

      例:es-cn-45xxxxxxxxxxxxk1q

      <port> Elasticsearch インスタンスのパブリックネットワークポート。 インスタンスの [基本情報] ページでパブリックネットワークポートを確認できます。

      デフォルトのポート番号は、9200 です。

      <yourAccessId> Elasticsearch インスタンスへのアクセスと Kibana コンソールへのログインに使用されるユーザー名。 デフォルトのユーザー名は "elastic" です。
      <yourAccessKey> Elasticsearch インスタンスにアクセスし、 Kibana コンソールにログインするために使用されるパスワード。 パスワードは、Elasticsearch インスタンスを作成するときに指定されます。
      <yourIndex> Elasticsearch インスタンス上のドキュメントのインデックス。 インデックスはデータベース名のようなものです。 ドキュメントのインデックスが作成されていない場合は、最初にインデックスを作成してください。 詳細は、「インデックスの作成」をご参照ください。
      <yourTypeName> インデックスのタイプ。 タイプは、データベース内のテーブルの名前のようなものです。 インデックスにタイプが指定されていない場合は、最初にタイプを指定します。 詳細は、「インデックスの作成」をご参照ください。

      WITH 変数の詳細については、「ElasticSearch 結果テーブルの作成」をご参照ください。

      • Elasticsearch は、PRIMARY KEY フィールドに含まれるドキュメント ID に従ったドキュメントの更新をサポートしています。 PRIMARY KEY フィールドとして指定できるフィールドは 1 つだけです。 PRIMARY KEY フィールドを指定すると、フィールドの値がドキュメント ID として使用されます。 ドキュメント ID は、PRIMARY KEY フィールドのないドキュメントに対してランダムに生成されます。 詳細については『Index API』をご参照ください。
      • Elasticsearch は複数の更新モードをサポートしています。 updateMode パラメーターを設定して、更新モードを指定できます。
        • updateMode=full の場合、新しいドキュメントが既存のドキュメントを上書きします。
        • updateMode=inc の場合、新しい値が、関連するフィールドの既存の値を上書きします。
      • Elasticsearch のすべての更新は、データの挿入または更新を意味する UPSERT 構文に従います。
    3. データ消費ロジックを作成し、データを同期します。
      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  3. ジョブを送信して実行します。
    ジョブを送信して実行すると、Log Service に保存されたデータが集約され、Elasticsearch にインポートされます。 Realtime Compute は他の計算操作もサポートしています。たとえば、データビューやユーザー定義拡張機能 (UDX) を作成できます。 詳細は、「Flink SQL の概要」をご参照ください。

まとめ

Alibaba Cloud Realtime Compute と Elasticsearch を使用すると、独自のリアルタイム検索サービスをすばやく作成できます。 Alibaba Cloud Elasticsearch にデータをインポートするために、より複雑なロジックが必要な場合は、Realtime Compute のカスタムシンク機能を使用してください。 詳細は、「カスタム結果テーブルの作成」をご参照ください。