このページでは、Spark で Table Store データを消費する方法について説明します。

Spark への Table Store アクセス許可

  • テーブルを準備します。

pet という名前のテーブルを作成します。 名前列をプライマリキーフィールドに設定します。

名前 飼い主 種類 性別 誕生日 命日
ふわふわ ハロルド ネコ 1993-02-04
クローズ グウェン ネコ 1994-03-17
バフィ ハロルド 1989-05-13
ファング ベニー 1990-08-27
クッパ ダイアン 1979-08-31 1995-07-29
チャーピー グウェン 1998-09-11
ウィスラー グウェン 1997-12-09
スリム ベニー ヘビ 1996-04-29
パフボール ダイアン ハムスター 1999-03-30
  • 以下の例に、Spark が Table Store データを消費する方法を示します。
    private static RangeRowQueryCriteria fetchCriteria() {
        RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
        res.setMaxVersions(1);
        List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
        List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
        lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
        upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
        res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
        res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
        return res;
    }
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        Configuration hadoopConf = new Configuration();
        JavaSparkContext sc = null;
        try {
            sc = new JavaSparkContext(sparkConf);
            Configuration hadoopConf = new Configuration();
            TableStore.setCredential(
                    hadoopConf,
                    new Credential(accessKeyId, accessKeySecret, securityToken));
            Endpoint ep = new Endpoint(endpoint, instance);
            TableStore.setEndpoint(hadoopConf, ep);
            TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
            JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                    hadoopConf, TableStoreInputFormat.class,
                    PrimaryKeyWritable.class, RowWritable.class);
            System.out.println(
                new Formatter().format("TOTAL: %d", rdd.count()).toString());
        } finally {
            if (sc ! = null) {
                sc.close();
            }
        }
    }

付録

完全なサンプルコードについては、以下をご参照ください。Spark への Table Store アクセス許可