ログを OSS ストレージに転送するだけでなく、DataWorks のデータ統合機能を使用してログデータを MaxCompute に転送することもできます。 データ統合は、Alibaba Group が外部ユーザーに提供する、安定、効率的、且つ柔軟で拡張可能なデータ同期プラットフォームです。 それはAlibaba Cloudsビッグデータコンピューティングエンジン(MaxCompute、AnalyticDB、および OSPS を含む)にオフラインのバッチデータアクセスチャネルを提供します。

この機能が使用可能なリージョンの詳細については、DataWorks を参照してください。

シナリオ

  • リージョンを跨げるデータソース(LogHub と MaxCompute)間のデータ同期
  • 異なる Alibaba Cloud アカウントを持つデータソース(LogHub と MaxCompute)間のデータ同期
  • 同じ Alibaba Cloud アカウントを持つデータソース(LogHub と MaxCompute)間のデータ同期
  • パブリッククラウドアカウントと AntCloud アカウントを使用したデータソース(LogHub と MaxCompute)間のデータ同期

前提条件

  1. Log Service、MaxCompute、および DataWorks が有効にされています。
  2. Log Serviceはログデータを正常に収集し、LogHub は転送するデータを持っています。
  3. データソースアカウントに対応するアクセスキーペアが有効になります。
  4. アカウントを跨げて転送するには、RAM 認証を構成する必要があります。

    詳細は、本ドキュメントのアカウント間でのログ配信の権限付与を実行するを参照してください。

手順

手順 1:データソースを作成します。

  1. DataWorksコンソールで Data Integration ページを開きます。 左側のナビゲーションバーで、Data Source をクリックします。
  2. Data Source ページで、右上隅の Add Data Source をクリックします。 Add Data Source ダイアログボックスが表示されます。
  3. Message Queue リストから LogHub をクリックします。 Add Data Source LogHub データソースページが表示されます。
    図 1. データソースの追加


  4. データソースの構成項目を設定します。
    次の表に構成項目を示します。
    構成項目 説明
    データソース名 データソース名は、文字、数字、およびアンダースコアで構成されます。 先頭は英字または下線でなければならず、60文字を超えることはできません。
    データソース説明 データソースに関する簡単な説明。最大で80文字です。
    LOG Endpoint Log Service のエンドポイント、http://yyy.com の形式で、お住まいのリージョンによって決まります 。 詳細は、Service endpointを参照してください。
    LOG Project ログデータの送信先となる MaxCompute の Log Service プロジェクト。 存在している、 作成済みのプロジェクトでなければなりません。
    Access Id/Access Key データソースアカウントのアクセスキーは、ログインパスワードと同じです。 プライマリアカウントのアクセスキーまたはデータソースのサブアカウントを入力できます。 構成が成功すると、現行のアカウントにデータソースのアカウントログへのアクセス権限が付与され、同期タスクを通じてデータソースアカウントのログを送信できます。
    図 2. LogHub データソースの作成


  5. Test Connectivity をクリックします、接続テストに成功した後、ページの右上隅にある Finish をクリックします。

手順2 同期タスクの構成

左側のナビゲーションバーで Synchronization Task をクリックし、Create a synchronization task をクリックして同期タスクを構成します。

Wizard Mode を選択してより簡単に構成できます、Script Mode を選択して、同期タスクの構成をさらにカスタマイズできます。

Wizard mode
タスク同期ノードの構成項目には、ソースの選択、ターゲットの選択、フィールドマッピング、およびチャネル制御があります。
  1. ソースを選択します。

    Data source: 手順1で構成したデータソースを選択します。 構成項目を次の表のように設定します:

    構成項目 説明
    Data source LogHub データソースの名前を選択します。
    Logstore 増分データのエクスポート元のテーブルの名前。 テーブルを作成するとき、または後で UpdateTable API を使用するときは、テーブルでストリーム機能を有効にする必要があります。
    ログの開始時刻 データ消費の開始時刻 このパラメーターは、yyyyMMddHHmmss(20180111013000など) の形式で時間範囲の左ボーダー(左閉右開)を定義し、DataWorks のスケジューリング時間パラメーターで機能します。
    ログの終了時刻 データ消費の終了時刻 このパラメーターは、yyyyMMddHHmmss(20180111013010など) の形式で時間範囲の右ボーダー(左閉右開)を定義し、DataWorks のスケジューリング時間パラメーター形式で機能します。
    バッチサイズ 毎回に読取りできるるデータ項目の数。 デフォルト値は256。
    構成項目を設定したら、Data Preview ドロップダウンボタンをクリックしてData Preview の詳細を表示します。 ログデータが取得されたことを確認して、Next をクリックします。
    データプレビューは、LogHubから選択されたいくつかのデータエントリを表示します。 同期データはログの開始時刻と終了時刻で設定されるため、プレビュー結果は設定した同期データと異なる場合があります。
    図 3. ソースを選択します。


  2. ターゲットを選択します。
    1. MaxComputeデータソースとターゲットテーブルを選択します。

      MaxComputeテーブルを作成していない場合は、右側の Generate Target Table in One Click をクリックします。 ポップアップメニューの [Create Data Table] を選択します。

    2. Partition information を入力します。

      パーティション構成は正規表現をサポートしています。 たとえば、パーティション「*」のpt値を設定して、すべてのptパーティションのデータを読み取ることができます。

    3. Clearing Rules を選択します。

      書き込み前に既存のデータを消去する(上書きモード)か、既存のデータを保持する(挿入モード)かを選択できます。

    構成後、 Next をクリックします。

    図 4. ターゲットを選択します。


  3. フィールドをマッピングします。

    フィールド間のマッピングを選択します。 フィールドのマッピング関係を設定する必要があります。 左側のソーステーブルフィールドは、右側のターゲットテーブルフィールドと1対1で対応しています。 Same row mapping をクリックして、 Same row mapping を選択、または選択解除できます。

    • ログフィールドを同期列として手動で追加する必要がある場合は、構成を使用してください。
    • 定数を入力することができます。 各定数は、'abc' や '123' のように、一重引用符で囲む必要があります。
    • $ {bdp.system.bizdate} などのスケジューリングパラメータを追加できます。
    • now()やcount(1)など、リレーショナルデータベースでサポートされている関数を入力できます。
    • 入力した値を解析できない場合、タイプは「識別されていません」と表示されます。
    図 5. マッピングフィールド


  4. トンネルを制御します。
    次の図に示すように、最大ジョブレートとダーティデータチェックルールを設定します。
    図 6. トンネルを制御します。


    構成項目説明:
    • DMU:データ統合中に消費されるリソース(CPU、メモリ、ネットワーク帯域幅など)を測定するデータマイグレーション単位。
    • 同時ジョブ数:データ同期タスクでデータ記憶メディアからデータを同時に読み書きするのに使用されるスレッドの最大数
  5. 構成をプレビューして保存します。

    設定が完了したら、上下にスクロールしてタスク設定を表示できます。 エラーが発生しない場合は、Saveをクリックします。

    図 7. 構成をプレビューして保存します。


Script mode

スクリプトを使用してタスクを設定するには、次のスクリプトを参照してください。

{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "loghub",
"parameter": {
"datasource": "loghub_lzz",//データソース名。 追加したデータリソース名を使用します。
"logstore": "logstore-ut2",//対象のログストア名。 ログストアは、ログサービス内のログデータの収集、保存、およびクエリ単位です。
"beginDateTime": "${startTime}",//データ消費の開始時刻。 このパラメータは、時間範囲の左のボーダーを定義します(左閉右開)
"endDateTime": "${endTime}",//データ消費の終了時刻。 このパラメータは、時間範囲の右のボーダーを定義します(左閉右開)
"batchSize": 256,//毎回に読み取りできるデータ項目の数 デフォルト値は256。
"splitPk": "",
"column": [
"key1",
"key2",
"key3"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",//データソース名。 追加したデータリソース名を使用します。
"table": "ok",//対象のテーブル名
"truncate": true,
"partition": "",//シャード情報
"column": [//対象の列の名前
"key1",
"key2",
"key3"
]
}
},
"setting": {
"speed": {
"mbps": 8,//最大ジョブレート
"concurrent": 7//同時ジョブ数
}
}
}
}

手順3 タスクの実行

次のいずれかの方法でタスクを実行できます。

  • タスクを直接実行します(1回実行)。
    タスクの上にある Run をクリックして、data integration ページでタスクを直接実行します。 タスクを実行する前に、カスタムパラメータの値を設定します。
    図 8. 実行中のタスク構成

    上図に示すように、10:10~17:30 の間の LogHub レコードは、MaxCompute と同期しています。

  • タスクのスケジューリング

    Submit をクリックして、同期タスクをスケジューリングシステムに送信します。 スケジューリングシステムは、設定属性に従ってタスクを翌日から自動的かつ定期的に実行します。

    00:00〜23:59のスケジュール期間で、 startTime = $ [yyyymmddhh24miss-10/24/60]システムの10分前から endTime = $ [yyyymmddhh24miss-5/24/60]システムの5分前まで、スケジュール間隔を5分に設定します。 カスタムパラメータ構成の詳細については、 Parameter configuration を参照してください。

    図 9. スケジューリング設定


アカウント間でのログ配信の権限付与を実行する

アカウント間でログ配信タスクを構成するには、RAM上で権限付与を実行します。

  • アカウント間でのログ配信の権限付与を実行する

    プライマリアカウント間でデータを送信するには、ステップの Add LogHub Data Source でデータソースのプライマリアカウントのアクセスキーを入力します。 接続テストに合格した場合、権限付与は成功です。

    たとえば、アカウントBで有効された DataWorks サービスを通じてアカウントAのログデータをアカウントBの MaxCompute テーブルに転送するには、アカウントBでデータ統合タスクを設定し、ステップの Add LogHub Data Source でアカウントAのプライマリアカウントのアクセスキーを入力します 。 構成が成功した後、アカウントBにはアカウントAの下にあるすべてのログデータを読み取る権限が付与されます。

  • サブアカウント権限付与

    プライマリアカウントのアクセスキーを明らかにしたくない場合、またはサブアカウントによって収集されたログデータを転送する必要がある場合は、サブアカウントに対して明示的な承認を設定します。

    • サブアカウントに管理権限を割り当てる

      サブアカウントを通じてすべてのログデータをプライマリアカウントで転送する必要がある場合は、権限付与とアクセスキーの設定について次の手順を実行します。

      1. プライマリアカウントAを使用して、ログサービス管理権限(AliyunLogFullAccessおよびAliyunLogReadOnlyAccess)をサブアカウントA1に割り当てます。 詳細について、RAM ユーザーに Log Service へのアクセスを許可を参照してください。
      2. アカウントBを使用してデータ統合タスクを構成し、ステップの Add LogHub Data Source でデータソースのサブアカウントのアクセスキーを入力します。

      構成が成功した後、アカウントBにはアカウントAの下にあるすべてのログデータを読み取る権限が付与されます。

    • サブアカウントにカスタマイズ権限を割り当てます

      特定のログデータをサブアカウントを通じてプライマリアカウントで転送する必要がある場合は、権限付与とアクセスキーの設定について次の手順を実行します。

      1. プライマリアカウントAを使用して、サブアカウントA1のカスタマイズ権限付与ポリシーを設定します。関連する権限付与操作の詳細については、概要およびOverviewを参照してください。
      2. アカウントBを使用してデータ統合タスクを構成し、ステップの Add LogHub Data Source でデータソースのサブアカウントのアクセスキーを入力します。

      上記の手順が正常に完了すると、アカウントBにはアカウントAの指定されたログデータを読み取る権限が付与されます。

      カスタマイズ権限付与ポリシーの例

      このように、アカウントBは、サブアカウントA1を通じてLog Service内のproject_name1およびproject_name2データのみを同期できます。

      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }
    図 10. カスタマイズ権限付与ポリシー