データ準備

Table Store 内のテーブルに "pet" という名前を付け、次のデータをインポートします。 名前列が唯一のプライマリキーです。

名前 管理者 種類 性別 誕生日 死亡日
Fluffy Harold ネコ メス 1993-02-04
Claws Gwen ネコ オス 1994-03-17
Buffy Harold イヌ メス 1989-05-13
Fang Benny イヌ オス 1990-08-27
Bowser Diane イヌ オス 1979-08-31 1995-07-29
Chirpy Gwen トリ メス 1998-09-11
Whistler Gwen トリ 1997-12-09
Slim Benny ヘビ オス 1996-04-29
Puffball Diane ハムスター メス 1999-03-30
(データモデルトピックに従って) Table Store はスキーマフリーなので、空白のセルには何も入力する必要はありません ( NULL など)。

Hive によるアクセス例

準備

Hadoop、Hive、JDK のための環境と Table Store SDK および EMR の依存関係パッケージを必須条件として準備します。

# HADOOP_HOME と HADOOP_CLASSPATH を /etc/profile に追加できます。
$ export HADOOP_HOME=${Your Hadoop Path}
$ export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
$ bin/hive
hive> CREATE EXTERNAL TABLE pet
  (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
  STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
  WITH SERDEPROPERTIES(
    "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
  TBLPROPERTIES (
    "tablestore.endpoint"="YourEndpoint",
    "tablestore.access_key_id"="YourAccessKeyId",
    "tablestore.access_key_secret"="YourAccessKeySecret",
    "tablestore.table.name"="pet");
hive> SELECT * FROM pet;
Bowser  Diane   dog     m       1979-08-31      1995-07-29
Buffy   Harold  dog     f       1989-05-13      NULL
Chirpy  Gwen    bird    f       1998-09-11      NULL
Claws   Gwen    cat     m       1994-03-17      NULL
Fang    Benny   dog     m       1990-08-27      NULL
Fluffy  Harold  cat     f       1993-02-04      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
hive> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy  Gwen    bird    f       1998-09-11      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 1.41 seconds, Fetched 4 row(s)
			

パラメータ説明

  • WITH SERDEPROPERTIES

    tablestore.columns.mapping (オプション): デフォルトでは、外部テーブルのフィールド名 (Hiveの規約に従って小文字で表記) は、Table Store の列名 (プライマリキーまたは属性列の名前) と同じです。 ただし、大文字と小文字が区別されるか文字セットが原因で、名前が異なる場合があります。 この場合は、tablestore.columns.mapping を指定する必要があります。 このパラメータはカンマ区切りの文字列です。 コンマの前後に空白を追加することはできません。 各項目は列名であり、順序は外部表のフィールド名と同じです。

    Table Store は空白文字を含む列名をサポートします。つまり、空白は列名の一部と見なされます。
  • TBLPROPERTIES
    • tablestore.endpoint (必須): エンドポイントです。 Table Store コンソールでインスタンスのエンドポイント情報を表示できます。

    • tablestore.instance (オプション): インスタンス名です。 指定されていない場合は、tablestore.endpoint の最初のフィールドです。

    • tablestore.table.name (必須): Table Store 内のテーブル名。

    • tablestore.access_key_id、tablestore.access_key_secret (必須): 「アクセス制御」をご参照ください。

    • tablestore.sts_token (オプション): 「セキュリティトークン」をご参照ください。

HadoopMR によるアクセス例

次の例は、HadoopMR を使用して pet 内の行を数える方法を示しています。

コード例

  • マッパーとリデューサーの構築
    public class RowCounter {
    public static class RowCounterMapper
    extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> {
        private final static Text agg = new Text("TOTAL");
        private final static LongWritable one = new LongWritable(1);
    
        @Override
        public void map(
            PrimaryKeyWritable key, RowWritable value, Context context)
            throws IOException, InterruptedException {
            context.write(agg, one);
        }
    }
    
    public static class IntSumReducer
    extends Reducer<Text,LongWritable,Text,LongWritable> {
    
        @Override
        public void reduce(
            Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }
    }
    					

    HadoopMR は pet から行を取得するたびに、マッパーの map() を呼び出します。 最初の2つのパラメーター PrimaryKeyWritable と RowWritable は、それぞれ行のプライマリキーとこの行の内容に対応しています。 PrimaryKeyWritable.getPrimaryKey() および RowWritable.getRow() を呼び出すことで、Table Store JAVA SDK によって定義されたプライマリキーオブジェクトと行オブジェクトを取得できます。

  • マッパーのデータソースとしてテーブルストアの設定
        private static RangeRowQueryCriteria fetchCriteria() {
            RangeRowQueryCriteria res = new     RangeRowQueryCriteria("YourTableName");
            res.setMaxVersions(1);
            List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
            List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
            lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
            upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
            res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
            res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
            return res;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "row count");
            job.addFileToClassPath(new Path("hadoop-connector.jar"));
            job.setJarByClass(RowCounter.class);
            job.setMapperClass(RowCounterMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            job.setInputFormatClass(TableStoreInputFormat.class);
            TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/");
            TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret");
            TableStoreInputFormat.addCriteria(job, fetchCriteria());
            FileOutputFormat.setOutputPath(job, new Path("output"));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    					

    上記の例では、job.setInputFormatClass(TableStoreInputFormat.class) を使用して Table Store をデータソースとして設定しています。 例を完了するには、次の手順も必要です。

    • hadoop-connector.jar をクラスタに展開して、それをクラスパスに追加します。 hadoop-connector.jar のローカルパスは addFileToClassPath() で指定されています。 コード例では、hadoop-connector.jar が現在のパスにあると仮定しています。

    • Table Store にアクセスするときにエンドポイントとアクセスキーを指定します。 エンドポイントとアクセスキーは TableStoreInputFormat.setEndpoint() と TableStoreInputFormat.setCredential() を使用して設定できます。

    • カウントするテーブルを指定してください。

      • TableStoreInputFormat.addCriteria() は複数回呼び出すことができます。 呼び出しごとに RangeRowQueryCriteria オブジェクトが追加されます。
      • setFilter() および addColumnsToGet() を設定して、サーバー側で不要な行と列をフィルタリングし、コストを削減し、Table Store のパフォーマンスを向上させます。
      • それらをマージするために RangeRowQueryCriterias を複数のテーブルに追加します。
      • 分割を調整するために、1 つのテーブルに複数の RangeRowQueryCriterias を追加します。 TableStore-Hadoop Connector は、指定された要件に基づいてユーザーの入力範囲を分割できます。

Host プログラムの実行

$ HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
...
$ find output -type f
output/_SUCCESS
output/part-r-00000
output/. _SUCCESS.crc
output/.part-r-00000.crc
$ cat out/part-r-00000
TOTAL   9
			

データ型変換

Table Store と Hive/Spark は異なるデータ型のセットをサポートします。

次の表は、Table Store (行) から Hive (列) へのデータ型変換のサポートを示しています。

TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING BINARY
INTEGER はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい はい (精度は限られます) はい (精度は限られます)
DOUBLE はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい
BOOLEAN はい
STRING はい
BINARY はい