データ準備

Table Store 内のテーブルに "pet" という名前を付け、データをインポートします。 名前 列が唯一のプライマリキーです。

名前 オーナー 種類 性別 誕生日 死亡日
Fluffy Harold ネコ メス 1993-02-04
Claws Gwen ネコ オス 1994-03-17
Buffy Harold イヌ メス 1989-05-13
Fang Benny イヌ オス 1990-08-27
Bowser Diane イヌ オス 1979-08-31 1995-07-29
Chirpy Gwen トリ メス 1998-09-11
Whistler Gwen トリ 1997-12-09
Slim Benny ヘビ オス 1996-04-29
Puffball Diane ハムスター メス 1999-03-30
データモデルに従うと Table Store はスキーマフリーなので、空白のセルには ( NULL など) 何も入れる必要はありません。

Spark SQL によるアクセスの例

準備

前提条件として、Spark、JDK の環境および Table Store SDK と EMR の依存関係パッケージを準備します。

$ bin/spark-sql --master local --jars tablestore-4.3.1-jar-with-dependencies.jar,emr-tablestore-1.4.2.jar
spark-sql> CREATE EXTERNAL TABLE pet
  (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
  STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
  WITH SERDEPROPERTIES(
    "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
  TBLPROPERTIES (
    "tablestore.endpoint"="YourEndpoint",
    "tablestore.access_key_id"="YourAccessKeyId",
    "tablestore.access_key_secret"="YourAccessKeySecret",
    "tablestore.table.name"="pet");
spark-sql> SELECT * FROM pet;
Bowser  Diane   dog     m       1979-08-31      1995-07-29
Buffy   Harold  dog     f       1989-05-13      NULL
Chirpy  Gwen    bird    f       1998-09-11      NULL
Claws   Gwen    cat     m       1994-03-17      NULL
Fang    Benny   dog     m       1990-08-27      NULL
Fluffy  Harold  cat     f       1993-02-04      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
spark-sql> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy  Gwen    bird    f       1998-09-11      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 1.41 seconds, Fetched 4 row(s)
			

パラメーターの説明

  • WITH SERDEPROPERTIES
    • tablestore.columns.mapping (オプション): デフォルトでは、外部テーブルのフィールド名 (Hive のルールに従って小文字で表記) は、Table Store の列名 (プライマリキー列または属性列の名前) と同じです。 ただし、大文字と小文字が区別されるか文字セットが原因で、名前が異なる場合があります。 この場合は、 tablestore.columns.mapping を指定する必要があります。 このパラメーターはカンマ区切りの文字列です。 コンマの前後に空白を追加することはできません。 各項目は列名であり、順序は外部表のフィールド名と同じです。
      Table Store は、空白文字を含む列名をサポートします。 したがって、空白スペースは列名の一部と見なされます。
  • TBLPROPERTIES
    • tablestore.endpoint (必須): エンドポイントです。 Table Store コンソールでインスタンスのエンドポイント情報を表示できます。

    • tablestore.instance (オプション): インスタンス名です。 指定されていない場合は、tablestore.endpoint の最初のフィールドです。

    • tablestore.table.name (必須): Table Store 内のテーブル名。

    • tablestore.access_key_id および tablestore.access_key_secret (必須): 「アクセス制御」をご参照ください。

    • tablestore.sts_token (オプション): 「セキュリティトークン」をご参照ください。

Spark によるアクセスの例

次の例では、 pet 内の行数を Spark によって数える方法を示します。

private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    Configuration hadoopConf = new Configuration();
    TableStoreInputFormat.setCredential(
        hadoopConf,
        new Credential("YourAccessKeyId", "YourAccessKeySecret"));
    TableStoreInputFormat.setEndpoint(
        hadoopConf,
        new Endpoint("https://YourInstance.Region.ots.aliyuncs.com/"));
    TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());

    try {
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
            hadoopConf,
            TableStoreInputFormat.class,
            PrimaryKeyWritable.class,
            RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        sc.close();
    }
}
			
scala を使用する場合は、JavaSparkContext を SparkContext に置き換え、JavaPairRDD を PairRDD に置き換えます。

プログラムの実行

$ bin/spark-submit --master local --jars hadoop-connector.jar row-counter.jar
TOTAL: 9
			

データ型変換

Table Store と Hive/Spark は異なる種類のデータ型をサポートします。

次の表は、Table Store (行) から Hive (列) へのデータ型変換のサポートを示しています。

TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING BINARY
INTEGER はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい はい (精度は限られます) はい (精度は限られます)
DOUBLE はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい (精度は限られます) はい
BOOLEAN はい
STRING はい
BINARY はい