このページでは、E-MapReduce SDK を使用して Spark で MaxCompute データを読み書きする方法について説明します。

Spark への MaxCompute アクセス許可

  1. OdpsOps オブジェクトを初期化します。 Spark では、MaxCompute のデータ操作は OdpsOps クラスを使用して実行されます。 OdpsOps オブジェクトを作成するには、以下のステップを実行します。
    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{ SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Step-1 ==
             val accessKeyId = "<accessKeyId>"
             val accessKeySecret = "<accessKeySecret>"
             // Take the internal network address as an example
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // A part of the calling code is shown as follows
             // == Step-2 ==
             ...
             // == Step-3 ==
             ...
           }
           // == Step-2 ==
           // Method definition 1
           // == Step-3 ==
           // Method definition 2
         }
  2. MaxCompute からテーブルデータを Spark に読み込みます。 以下のとおり、OdpsOps オブジェクトの readTable メソッドを使用することで MaxCompute テーブルを Spark に読み込み RDD を作成できます。
    // == Step-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Step-3 ==
             ...
    上記のコードでは、MaxCompute テーブルのデータを解析して事前処理するため以下のとおり読み取り関数を定義する必要があります。
    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }

    この関数により、MaxCompute テーブルの先頭の列を Spark 実行時環境に読み込みます。

  3. Spark の結果データを MaxCompute テーブルに保存します。 OdpsOps オブジェクトの saveToTable メソッドを使用して、Spark RDD を MaxCompute に保存できます。
    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)
    上記のコードでは、MaxCompute テーブルに書き込む前に以下のとおりデータ事前処理用の書き込み関数を定義する必要があります。
    def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
               val r = emptyReord
               r.set(0, s)
             }

    この関数により、各行の RDD データを対応する MaxCompute テーブルの先頭の列に書き込みます。

  4. パーティションテーブルパラメーターの表記

    SDK は、MaxCompute パーティションテーブルの読み書きをサポートしています。 テーブルの標準命名規則は partition_column_name=partition_name (複数のパーティションをコンマで区切ります) です。 パーティション列 pt と ps があるとします。

    • pt が 1 のパーティションのテーブルデータを読み取ります。
    • pt が 1、ps が 2 のパーティションのテーブルデータを読み取ります。

付録

完全なサンプルコードについては、以下をご参照ください。Spark への MaxCompute アクセス許可