データインポート

最終更新日: Nov 26, 2017

MaxComputeは、コンソールでTunnel Operationを直接使用するか、javaで書かれたTUNNELを使用して、データのインポートとエクスポートの2つの方法を提供します。

Tunnelコマンド

データ準備

ローカルファイルwc_example.txtを準備し、それに対応する内容を次のように示しているとします。

  1. I LOVE CHINA!
  2. MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!

ここで、ファイルをD:¥odps¥odps¥binディレクトリに保存します。

ODPSテーブルの作成

上記のデータをMaxComputeテーブルにインポートする必要があるので、ここでは最初にテーブルを作成する必要があります。

  1. CREATE TABLE wc_in (word string);

Tunnelコマンドの実行

入力テーブルが正常に作成されたら、次のようにして、MaxComputeコンソールのデータをトンネルコマンドでインポートできます。

  1. tunnel upload D:\odps\odps\bin\wc_example.txt wc_in;

実行が成功したら、次のようにテーブルwc_inのレコードを確認します。

  1. odps@ $odps_project>select * from wc_in;
  2. ID = 20150918110501864g5z9c6
  3. Log view:
  4. http://webconsole.odps.aliyun-inc.com:8080/logview/?h=http://service-corp.odps.aliyun-inc.com/api&p=odps_public_dev&i=20150918
  5. QWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0YW5jZXMvMjAxNTA5MTgxMTA1MDE4NjRnNXo5YzYiXX1dLC
  6. +------+
  7. | word |
  8. +------+
  9. | I LOVE CHINA! |
  10. | MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL! |
  11. +------+

注意:

  • トンネルコマンドの詳細については、トンネル操作を参照してください。

トンネルSDK

SDKトンネルを使用してデータをアップロードする方法については、次の簡単なシナリオを紹介します。シナリオの説明:ODPSにデータをアップロードします。プロジェクトは「odps_public_dev」、テーブル名は「tunnel_sample_test」、パーティションは「pt = 20150801、dt = “hangzhou”」です。

  1. テーブルを作成して対応するパーティションを追加する:

    1. CREATE TABLE IF NOT EXISTS tunnel_sample_test(
    2. id STRING,
    3. name STRING)
    4. PARTITIONED BY (pt STRING, dt STRING); --Create a table.
    5. ALTER TABLE tunnel_sample_test
    6. ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --Add the partitions.
  2. 次のように、UploadSampleのプログラムディレクトリ構造を作成します。

    1. |---pom.xml
    2. |---src
    3. |---main
    4. |---java
    5. |---com
    6. |---aliyun
    7. |---odps
    8. |---tunnel
    9. |---example
    10. |---UploadSample.java

    UploadSample:トンネルソースファイル。pom.xml:mavenプログラムファイル。

  3. Write UploadSampleプログラムを次のように実行します。

    1. package com.aliyun.odps.tunnel.example;
    2. import java.io.IOException;
    3. import java.util.Date;
    4. import com.aliyun.odps.Column;
    5. import com.aliyun.odps.Odps;
    6. import com.aliyun.odps.PartitionSpec;
    7. import com.aliyun.odps.TableSchema;
    8. import com.aliyun.odps.account.Account;
    9. import com.aliyun.odps.account.AliyunAccount;
    10. import com.aliyun.odps.data.Record;
    11. import com.aliyun.odps.data.RecordWriter;
    12. import com.aliyun.odps.tunnel.TableTunnel;
    13. import com.aliyun.odps.tunnel.TunnelException;
    14. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
    15. public class UploadSample {
    16. private static String accessId = "####";
    17. private static String accessKey = "####";
    18. private static String tunnelUrl = "http://dt-corp.odps.aliyun-inc.com";
    19. private static String odpsUrl = "http://service-corp.odps.aliyun-inc.com/api";
    20. private static String project = "odps_public_dev";
    21. private static String table = "tunnel_sample_test";
    22. private static String partition = "pt=20150801,dt=hangzhou";
    23. public static void main(String args[]) {
    24. Account account = new AliyunAccount(accessId, accessKey);
    25. Odps odps = new Odps(account);
    26. odps.setEndpoint(odpsUrl);
    27. odps.setDefaultProject(project);
    28. try {
    29. TableTunnel tunnel = new TableTunnel(odps);
    30. tunnel.setEndpoint(tunnelUrl);
    31. PartitionSpec partitionSpec = new PartitionSpec(partition);
    32. UploadSession uploadSession = tunnel.createUploadSession(project,
    33. table, partitionSpec);
    34. System.out.println("Session Status is : "
    35. + uploadSession.getStatus().toString());
    36. TableSchema schema = uploadSession.getSchema();
    37. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
    38. Record record = uploadSession.newRecord();
    39. for (int i = 0; i < schema.getColumns().size(); i++) {
    40. Column column = schema.getColumn(i);
    41. switch (column.getType()) {
    42. case BIGINT:
    43. record.setBigint(i, 1L);
    44. break;
    45. case BOOLEAN:
    46. record.setBoolean(i, true);
    47. break;
    48. case DATETIME:
    49. record.setDatetime(i, new Date());
    50. break;
    51. case DOUBLE:
    52. record.setDouble(i, 0.0);
    53. break;
    54. case STRING:
    55. record.setString(i, "sample");
    56. break;
    57. default:
    58. throw new RuntimeException("Unknown column type: "
    59. + column.getType());
    60. }
    61. }
    62. for (int i = 0; i < 10; i++) {
    63. recordWriter.write(record);
    64. }
    65. recordWriter.close();
    66. uploadSession.commit(new Long[]{0L});
    67. System.out.println("upload success!");
    68. } catch (TunnelException e) {
    69. e.printStackTrace();
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. }
    73. }
    74. }

    注意:ここでは、accessIdとaccesskeyの設定を無視しました。実際の操作では、独自のaccessIdとaccessKeyを変更してください。

  4. pom.xmlの設定は次のようになります。

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.aliyun.odps.tunnel.example</groupId>
    7. <artifactId>UploadSample</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>com.aliyun.odps</groupId>
    12. <artifactId>odps-sdk-core-internal</artifactId>
    13. <version>0.20.7</version>
    14. </dependency>
    15. </dependencies>
    16. <repositories>
    17. <repository>
    18. <id>alibaba</id>
    19. <name>alibaba Repository</name>
    20. <url>http://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
    21. </repository>
    22. </repositories>
    23. </project>
  5. コンパイルして実行します:プログラムをコンパイルUploadSample:

    1. mvn package

    UploadSampleプログラムを実行します。ここでは、Eclipseを使用してmavenプロジェクトをインポートします。Javaプログラムを右クリックし、 Maven-> Existing Maven Projects>をクリックすると、次のように設定されます。‘UploadSample.java’を右クリックし、次のように Run Configurations>をクリックします。<実行>をクリックします。正常に実行されると、コンソールには次のように表示されます。

    1. Session Status is : NORMAL
    2. upload success!
  6. 実行結果を確認する:コンソールに次の文を入力します。

    1. select * from tunnel_sample_test;

    その結果を以下に示す。

    1. +----+------+----+----+
    2. | id | name | pt | dt |
    3. +----+------+----+----+
    4. | sample | sample | 20150801 | hangzhou |
    5. | sample | sample | 20150801 | hangzhou |
    6. | sample | sample | 20150801 | hangzhou |
    7. | sample | sample | 20150801 | hangzhou |
    8. | sample | sample | 20150801 | hangzhou |
    9. | sample | sample | 20150801 | hangzhou |
    10. | sample | sample | 20150801 | hangzhou |
    11. | sample | sample | 20150801 | hangzhou |
    12. | sample | sample | 20150801 | hangzhou |
    13. | sample | sample | 20150801 | hangzhou |
    14. +----+------+----+----+

注意:

  • MaxComputeの独立したサービスとして、トンネルにはユーザー専用のアクセスポートがあります。同時に、内部のMaxComputeサービス用に複数のトンネルサービスが導入されており、生産部門と圧力のさまざまなニーズに対応しています。MaxComputeは、複数のアクセスアドレスセットに起因する混乱を防ぐため、アクセスポートのルーティング機能を提供します。詳細なアクセスポリシーは次のとおりです。
  • 運用ネットワークからトンネルサービスにアクセスする場合は、MaxComputeEndPointを指定するだけです。MaxComputeエンドポイントへの自動ルーティングをサポートするTunnel EndPointを設定する必要はありません。
  • オフィスネットワークからトンネルサービスにアクセスする場合は、MaxCompute EndPoint を指定するだけです。MaxComputeエンドポイントへの自動ルーティングをサポートするTunnel EndPointを設定する必要はありません。

Fluentdインポートスキーム

MaxCompute ConsoleとJava SDKに加えて、Fluentdからデータをインポートすることもできます。

Fluentdはオープンソースソフトウェアであり、さまざまなソースログ(ログアプリケーション、ログアクセス、ログシステムなど)を収集するために使用されます。これにより、ログデータをフィルタリングし、MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services、ODPSなどのさまざまなデータ処理クライアントに対応するプラグインを選択できます。Fluentdはコンパクトで柔軟性があり、ユーザーはデータソースをカスタマイズし、処理とターゲット端末をフィルタリングすることができます。現在、このソフトウェアにはFluentdアーキテクチャ上で300以上のプラグインが動作しており、これらのプラグインはすべてオープンソースです。MaxComputeはまた、このソフトウェアでデータインポートプラグインをオープンしました。

準備準備

このソフトウェアを使用してMaxComputeにデータをインポートするには、次の環境を準備する必要があります。

  • Ruby 2.1.0または更新済み
  • Gem 2.4.5または更新
  • Fluentd-0.10.49またはFluentd Official Websiteから最新バージョンを確認してください。Fluentdは異なるOSに異なるバージョンを提供します。詳細は、Fluentd Articlesを参照してください。
  • Protobuf-3.5.1または更新済み(Ruby protobuf)

インポートプラグインのインストール

次に、MaxCompute Fluentdインポートプラグインを次のいずれかの方法でインストールできます。

方法1:ruby gem を通してインストールします。

  1. $ gem install fluent-plugin-aliyun-odps

ODPSはこのプラグインをGEMにリリースしました。この名前は fluent-plugin-aliyun-odpsです。gem installコマンドでインストールするだけです。(gem の使用中に、gem にアクセスできない場合があります。この問題を解決するために、インターネットからchange gem sourseを検索することができます)。

方法2:pulg-inソースコードを使用してインストールする

  1. $ gem install protobuf
  2. $ gem install fluentd --no-ri --no-rdoc
  3. $ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git
  4. $ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r

上記のコマンドでは、2番目のコマンドを使用してfluentdをインストールします。すでにインストールしている場合は、このコマンドを無視できます。MaxCompute Fluentdプラグインのソースコードはgithubにあります。クローンした後、Fluentdの pluginディレクトリに入れてください。

プラグインを使用する

Fluentdを使用してデータをインポートするには、Fluentdの confファイルを設定することが最も重要です。confファイルの詳細については、Fluentd設定ファイルの紹介を参照してください。

例1:Nginxログをインポートします。Confの sourceの設定は次のようになります。

  1. <source>
  2. <source>
  3. type tail
  4. path /opt/log/in/in.log
  5. pos_file /opt/log/in/in.log.pos
  6. refresh_interval 5s
  7. tag in.log
  8. format /^(?<remote>[^ ]*) - - \[(?<datetime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^\"]*)"$/
  9. time_format %Y%b%d %H:%M:%S %z
  10. </source>

Fluentdは、指定されたファイルの内容が tailによって変更されたかどうかを監視します。詳細設定については、Fluentd Articles一致の構成は次のとおりです。

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint http://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint http://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project projectforlog
  11. <table in.log>
  12. table nginx_log
  13. fields remote,method,path,code,size,agent
  14. partition ctime=${datetime.strftime('%Y%m%d')}
  15. time_format %d/%b/%Y:%H:%M:%S %z
  16. </table>
  17. </match>

データはプロジェクト projectforlogのテーブルnginx_logにインポートされます。ソースデータの列「datatime」はパーティションとして扱われます。プラグインは、異なる値を満たすときに自動的にパーティションを作成します。

例2:MySqLのデータをインポートします。MySQLでデータをインポートする場合、データソースとして fluent-plugin-sqlをインストールする必要があります。

  1. $ gem install fluent-plugin-sql

confsourceを設定する:

  1. <source>
  2. type sql
  3. host 127.0.0.1
  4. database test
  5. adapter mysql
  6. username xxxx
  7. password xxxx
  8. select_interval 10s
  9. select_limit 100
  10. state_file /path/sql_state
  11. <table>
  12. table test_table
  13. tag in.sql
  14. update_column id
  15. </table>
  16. </source>

この例では、test_tableからデータを選択し、10秒ごとに100レコードを読み込みます。選択すると、ID列が主キーとなります(IDフィールドは自己拡張です)。fluent-plugin-sqlの詳細については、Fluentd SQLプラグインの説明を参照してください。

matchの設定は次のようになります。

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint http://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint http://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project your_projectforlog
  11. <table in.log>
  12. table mysql_data
  13. fields id,field1,field2,fields3
  14. </table>
  15. </match>

データは、projectforlogプロジェクトの mysql_dataテーブルにインポートされます。インポートされるフィールドには、id、field1、field2、およびfield3が含まれます。

プラグインパラメータの説明

MaxComputeにデータをインポートするには、’conf’ファイルの’match’項目にMaxComputeプラグインを設定する必要があります。サポートされているパラメータの説明は次のとおりです。

  • type(固定):固定値、aliyun_odps。
  • aliyun_access_id(必須):access_id。
  • aliyun_access_key(必須):アクセスキー。
  • aliyun_odps_hub_endpoint(必須):サーバーがECSにデプロイされている場合は、この値を http://dh-ext.odps.aliyun-inc.com に設定します。それ以外の場合は、http://dh.odps.aliyun.com に設定します。
  • aliyunodps_endpoint(必須):サーバーがESCにデプロイされている場合は、この値を http://odps-ext.aiyun-inc.com/apiに設定します。それ以外の場合は、http://service.odps.aliyun.com/apiに設定します。
  • buffer_chunk_limit(オプション):bolockサイズで、単位は “k”(KB)、 “m”(MB)、 “g”(GB)をサポートします。デフォルト値は8MBです。推奨値は2MBです。
  • buffer_queue_limit(オプション):ブロックキューのサイズ。この値と’buffer_chunk_limit’は、バッファ全体のサイズを決定します。
  • flush_interval(オプション):必要とされる送信間隔。時間が閾値に達し、ブロックデータが満たされない場合は、メッセージを必須に送信します。defalt値は60です。
  • project(必須):プロジェクト名。
  • table(必須):テーブル名。
  • fields(必須): sourceに対応します。フィールド名は sourceでなければなりません。
  • partition(オプション):テーブルがパーティションテーブルの場合、この項目を設定します。
  • パーティション名の設定モード:
    • 固定値:パーティションctime = 20150804
    • キーワード:パーティションctime = $ {リモート}(リモートは ソースのフィールドです)
    • datetimesourceの時刻書式フィールドです。パーティション名の出力形式は%Y%m%dです。)。
    • time_format(オプション):

Flume

Fluentdを使用してdatをインポートするほかに、MaxComputeはFlumeを通じてデータをインポートすることもサポートしています。FlumeはApacheのオープンソースソフトウェアです。MaxComputeはFlumeに基づいてインポートプラグインのソースコードをオープンしました。詳細については、Flume MaxCompute Plug-inを参照してください。