データ準備
Table Store 内のテーブルに "pet" という名前を付け、次のデータをインポートします。 名前
列が唯一のプライマリキーです。
名前 | 管理者 | 種類 | 性別 | 誕生日 | 死亡日 |
---|---|---|---|---|---|
Fluffy | Harold | ネコ | メス | 1993-02-04 | |
Claws | Gwen | ネコ | オス | 1994-03-17 | |
Buffy | Harold | イヌ | メス | 1989-05-13 | |
Fang | Benny | イヌ | オス | 1990-08-27 | |
Bowser | Diane | イヌ | オス | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | トリ | メス | 1998-09-11 | |
Whistler | Gwen | トリ | 1997-12-09 | ||
Slim | Benny | ヘビ | オス | 1996-04-29 | |
Puffball | Diane | ハムスター | メス | 1999-03-30 |
NULL
など)。
Hive によるアクセス例
準備
Hadoop、Hive、JDK のための環境と Table Store SDK および EMR の依存関係パッケージを必須条件として準備します。
例
# HADOOP_HOME と HADOOP_CLASSPATH を /etc/profile に追加できます。
$ export HADOOP_HOME=${Your Hadoop Path}
$ export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
$ bin/hive
hive> CREATE EXTERNAL TABLE pet
(name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
WITH SERDEPROPERTIES(
"tablestore.columns.mapping"="name,owner,species,sex,birth,death")
TBLPROPERTIES (
"tablestore.endpoint"="YourEndpoint",
"tablestore.access_key_id"="YourAccessKeyId",
"tablestore.access_key_secret"="YourAccessKeySecret",
"tablestore.table.name"="pet");
hive> SELECT * FROM pet;
Bowser Diane dog m 1979-08-31 1995-07-29
Buffy Harold dog f 1989-05-13 NULL
Chirpy Gwen bird f 1998-09-11 NULL
Claws Gwen cat m 1994-03-17 NULL
Fang Benny dog m 1990-08-27 NULL
Fluffy Harold cat f 1993-02-04 NULL
Puffball Diane hamster f 1999-03-30 NULL
Slim Benny snake m 1996-04-29 NULL
Whistler Gwen bird NULL 1997-12-09 NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
hive> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy Gwen bird f 1998-09-11 NULL
Puffball Diane hamster f 1999-03-30 NULL
Slim Benny snake m 1996-04-29 NULL
Whistler Gwen bird NULL 1997-12-09 NULL
Time taken: 1.41 seconds, Fetched 4 row(s)
パラメータ説明
- WITH SERDEPROPERTIES
tablestore.columns.mapping (オプション): デフォルトでは、外部テーブルのフィールド名 (Hiveの規約に従って小文字で表記) は、Table Store の列名 (プライマリキーまたは属性列の名前) と同じです。 ただし、大文字と小文字が区別されるか文字セットが原因で、名前が異なる場合があります。 この場合は、tablestore.columns.mapping を指定する必要があります。 このパラメータはカンマ区切りの文字列です。 コンマの前後に空白を追加することはできません。 各項目は列名であり、順序は外部表のフィールド名と同じです。
注 Table Store は空白文字を含む列名をサポートします。つまり、空白は列名の一部と見なされます。 - TBLPROPERTIES
-
tablestore.endpoint (必須): エンドポイントです。 Table Store コンソールでインスタンスのエンドポイント情報を表示できます。
-
tablestore.instance (オプション): インスタンス名です。 指定されていない場合は、tablestore.endpoint の最初のフィールドです。
-
tablestore.table.name (必須): Table Store 内のテーブル名。
-
tablestore.access_key_id、tablestore.access_key_secret (必須): 「アクセス制御」をご参照ください。
-
tablestore.sts_token (オプション): 「セキュリティトークン」をご参照ください。
-
HadoopMR によるアクセス例
次の例は、HadoopMR を使用して pet 内の行を数える方法を示しています。
コード例
- マッパーとリデューサーの構築
public class RowCounter { public static class RowCounterMapper extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> { private final static Text agg = new Text("TOTAL"); private final static LongWritable one = new LongWritable(1); @Override public void map( PrimaryKeyWritable key, RowWritable value, Context context) throws IOException, InterruptedException { context.write(agg, one); } } public static class IntSumReducer extends Reducer<Text,LongWritable,Text,LongWritable> { @Override public void reduce( Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } }
HadoopMR は pet から行を取得するたびに、マッパーの map() を呼び出します。 最初の2つのパラメーター PrimaryKeyWritable と RowWritable は、それぞれ行のプライマリキーとこの行の内容に対応しています。 PrimaryKeyWritable.getPrimaryKey() および RowWritable.getRow() を呼び出すことで、Table Store JAVA SDK によって定義されたプライマリキーオブジェクトと行オブジェクトを取得できます。
- マッパーのデータソースとしてテーブルストアの設定
private static RangeRowQueryCriteria fetchCriteria() { RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName"); res.setMaxVersions(1); List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>(); List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>(); lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN)); upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX)); res.setInclusiveStartPrimaryKey(new PrimaryKey(lower)); res.setExclusiveEndPrimaryKey(new PrimaryKey(upper)); return res; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "row count"); job.addFileToClassPath(new Path("hadoop-connector.jar")); job.setJarByClass(RowCounter.class); job.setMapperClass(RowCounterMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TableStoreInputFormat.class); TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/"); TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret"); TableStoreInputFormat.addCriteria(job, fetchCriteria()); FileOutputFormat.setOutputPath(job, new Path("output")); System.exit(job.waitForCompletion(true) ? 0 : 1); }
上記の例では、job.setInputFormatClass(TableStoreInputFormat.class) を使用して Table Store をデータソースとして設定しています。 例を完了するには、次の手順も必要です。
-
hadoop-connector.jar をクラスタに展開して、それをクラスパスに追加します。 hadoop-connector.jar のローカルパスは addFileToClassPath() で指定されています。 コード例では、hadoop-connector.jar が現在のパスにあると仮定しています。
-
Table Store にアクセスするときにエンドポイントとアクセスキーを指定します。 エンドポイントとアクセスキーは TableStoreInputFormat.setEndpoint() と TableStoreInputFormat.setCredential() を使用して設定できます。
-
カウントするテーブルを指定してください。
注- TableStoreInputFormat.addCriteria() は複数回呼び出すことができます。 呼び出しごとに RangeRowQueryCriteria オブジェクトが追加されます。
- setFilter() および addColumnsToGet() を設定して、サーバー側で不要な行と列をフィルタリングし、コストを削減し、Table Store のパフォーマンスを向上させます。
- それらをマージするために RangeRowQueryCriterias を複数のテーブルに追加します。
- 分割を調整するために、1 つのテーブルに複数の RangeRowQueryCriterias を追加します。 TableStore-Hadoop Connector は、指定された要件に基づいてユーザーの入力範囲を分割できます。
-
Host プログラムの実行
$ HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
...
$ find output -type f
output/_SUCCESS
output/part-r-00000
output/. _SUCCESS.crc
output/.part-r-00000.crc
$ cat out/part-r-00000
TOTAL 9
データ型変換
Table Store と Hive/Spark は異なるデータ型のセットをサポートします。
次の表は、Table Store (行) から Hive (列) へのデータ型変換のサポートを示しています。
TINYINT | SMALLINT | INT | BIGINT | FLOAT | DOUBLE | BOOLEAN | STRING | BINARY | |
---|---|---|---|---|---|---|---|---|---|
INTEGER | はい (精度は限られます) | はい (精度は限られます) | はい (精度は限られます) | はい | はい (精度は限られます) | はい (精度は限られます) | |||
DOUBLE | はい (精度は限られます) | はい (精度は限られます) | はい (精度は限られます) | はい (精度は限られます) | はい (精度は限られます) | はい | |||
BOOLEAN | はい | ||||||||
STRING | はい | ||||||||
BINARY | はい |