ここでは、一般的な MapReduce インターフェイスを説明します。

Maven を使用している場合は、必要な Java SDK (さまざまなバージョンで利用できます) を入手するために Maven ライブラリから "odps-sdk-mapred" を検索できます。 設定は次のとおりです。
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.20.7-public</version>
</dependency>
インターフェイス 説明
MapperBase このクラスから継承するには、ユーザー定義の Map 関数が必要です。 入力テーブルのレコードオブジェクトを処理し、キー値を処理してオブジェクトにして、値を Reduce ステージへ出力します。または結果レコードを Reduce ステージを通さずに結果テーブルへ出力します。 Reduce ステージを通さず、直接計算結果を出力するジョブを Map-Only ジョブと呼びます。
ReducerBase カスタマイズされた Reduce 関数はこのクラスを継承する必要があります。 キーと関連がある値のセットは減少します。
TaskContext MapperBase と ReducerBase の複数のメンバ関数の入力パラメータの 1 つです。 タスクについてのコンテキスト情報が含まれます。
JobClient ジョブを送信し、管理するために使用します。 送信モードはブロッキング (同期) モード、またはノンブロッキング (非同期) モードを含みます。
RunningJob ジョブ実行中のオブジェクトを表示し、ジョブ実行プロセス中に MapReduce ジョブインスタンスをトレースするために利用されます。
JobConf MapReduce タスクの設定を説明します。 JobConf オブジェクトは 通常メインプログラム (main 関数) で定義され、ジョブは JobClient によって MaxCompute へ送信されます。

MapperBase

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
void cleanup(TaskContext context) Map メソッドは map ステージが終了した後に呼び出されます。
void map(long key, Record record, TaskContext context) Map メソッドは入力テーブルのレコードを処理します。
void setup(TaskContext context) Map メソッドは map ステージが開始される前に呼び出されます。

ReducerBase

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
void cleanup( TaskContext context) Reduce メソッドは reduce ステージが終了した後に呼び出されます。
void reduce(Record key, Iterator<Record > values, TaskContext context) Reduce メソッドは入力テーブルのレコードを処理します。
void setup( TaskContext context) Reduce メソッドは reduceステージが開始される前に呼び出されます。

TaskContext

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
TableInfo[] getOutputTableInfo() 出力テーブルの情報を取得します。
Record createOutputRecord() デフォルトの出力テーブルのレコードオブジェクトを作成します。
Record createOutputRecord(String label) 指定されたラベルを持つ出力テーブルのレコードオブジェクトを作成します。
Record createMapOutputKeyRecord() Map によって出力されたレコードオブジェクトのキーを作成します。
Record createMapOutputValueRecord() Map によって出力されたレコードオブジェクトの値を作成します。
void write(Record record) レコードをデフォルト出力に書き込みます。Reduce クライアントによって出力データの書き込みに使用されます。Reduce クライアント上で複数回呼び出すことができます。
void write(Record record, String label) レコードを指定されたラベルの出力に書き込みます。Reduce クライアントによって出力データの書き込みに使用されます。Reduce クライアント上で複数回呼び出すことができます。
void write(Record key, Record value) Map は中間結果のレコードを書き込みます。 Map 関数の中で呼び出すことができ、 Map クライアント上で複数回呼び出すことができます。
BufferedInputStream readResourceFileAsStream(String resourceName) ファイルタイプリソースを読み取ります。
Iterator<Record > readResourceTable(String resourceName) テーブルタイプリソースを読み取ります。
Counter getCounter(Enum<? > > name) 指定された名前の Counter オブジェクトを取得します。
Counter getCounter(String group, String name) 指定された名前とグループ名の Counter オブジェクトを取得します。
void progress() ハートビートの情報を MapReduce フレームワークへ通知します。 ユーザーのメソッドの処理に長時間かかる場合や、プロセス内で呼び出されるフレームワークがない場合に、タスクのタイムアウトを防ぐためにこのメソッドを呼び出します。 フレームワークのタイムアウトはデフォルトで 600 秒に設定されています。
重要
  • MaxCompute TaskContext インターフェイスは progress 関数を提供しますが、この関数は Worker が長時間実行されたときに終了されるのを防ぎます。フレームワークはこの Worker をタイムアウトした Worker とみなします。このインターフェイスはハートビート情報をフレームワークへ送信するのと似ていますが、Worker の進捗は通知しません。
  • MaxCompute MapReduce Worker のデフォルトのタイムアウトスケジュールは 10 分間です (システムデフォルト、ユーザーは制御できません)。 スケジュールが 10 分を超え、 Worker がハートビート情報をフレームワークへ送信できない場合 (progress インターフェイスを呼び出さない)、フレームワークはこの Worker を中断しなければならず、 MapReduce タスクは失敗して終了します。worker がフレームワークによって終了されることを防ぐために、Mapper/Reducer 関数内で定期的に progress インターフェイスを呼び出すことを推奨します。

JobConf

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
void setResources(String resourceNames) このジョブで使用されているリソースを宣言します。 Mapper/Reducer の処理実行中は、宣言されたリソースのみが TaskContext オブジェクトによって読み取られます。
void setMapOutputKeySchema(Column[] schema) Mapper から Reducer へ出力されたキー属性を設定します。
void setMapOutputValueSchema(Column[] schema) Mapper から Reducer へ出力された属性値を設定します。
void setOutputKeySortColumns(String[] cols) Mapper から Reducer へ出力された、ソートするキー列を設定します。
void setOutputGroupingColumns(String[] cols) グループ化されたキー列を設定します。
void setMapperClass(Class<? extends Mapper > theClass) ジョブの Mapper 関数を設定します。
void setPartitionColumns(String[] cols) ジョブで指定されたパーティション列を設定します。デフォルトは Mapper によって出力されたキーのすべての列です。
void setReducerClass(Class<?  extends Reducer  theClass) ジョブの Reducer を設定します。
void setCombinerClass(Class<? extends Reducer  theClass) Map クライアント上で実行される、ジョブのコンバイナを設定します。 この関数は単一の Map による同一のローカルキーバリューに対する Reduce 操作と似ています。
void setSplitSize(long size) 入力スライスのサイズを設定します。 単位: MB デフォルト値は 640 です。
void setNumReduceTasks(int n) Reduce タスクの数を設定します。 デフォルトは Mapper タスクの 1/4 です。
void setMemoryForMapTask(int mem) Mapper タスク内の単一 Worker のメモリサイズを設定します。 単位: MB デフォルト値は 2048 です。
void setMemoryForReduceTask(int mem) Reduce タスクの単一 Worker のメモリサイズを設定します。 単位: MB デフォルト値は 2048 です。
  • 通常、KeySortColumns と PartitionColumns はキーに含まれますが、GroupingColumns は KeySortColumns に含まれます。
  • Map 側では、マッパーの出力レコードは PartitionColumns を使用して計算されたハッシュ値に従って reducer へ配信され、KeySortColumns でソートされます。
  • Reduce 側では、 KeySortColumns によってソートされた後、入力レコードは reduce 関数の入力グループとして順次グループ化されます。 つまり、同じ GroupingColumns 値のレコードは同じ入力グループとして扱われます。

JobClient

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
static RunningJob runJob(JobConf job) 同期 (ブロッキング) モードで MapReduce ジョブを送信した直後に返ります。
static RunningJob submitJob(JobConf job) 非同期 (ノンブロッキング) モードで MapReduce ジョブを送信した直後に返ります。

RunningJob

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
String getInstanceID() 実行ログとジョブ管理を確認するためのインスタンス ID を取得します。
boolean isComplete() ジョブが完了したかどうかを確認します。
boolean isSuccessful() ジョブインスタンスが成功したかどうかを確認します。
void waitForCompletion() ジョブインスタンスが完了するまで待機します。 通常は非同期モードで送信されたジョブに対して使用されます。
JobStatus getJobStatus() ジョブインスタンスのステータスを確認します。
void killJob() ジョブを終了します。
Counters getCounters() Counter の情報を取得します。

InputUtils

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
static void addTable(TableInfo table, JobConf conf) タスク入力にテーブルを追加します。 複数回呼び出すことができます。 新しく追加されたテーブルは、追加モードで入力キューに追加されます。
static void setTables(TableInfo [] tables, JobConf conf) タスク入力にテーブルを追加します。

OutputUtils

主な関数インターフェイスは次のとおりです。
インターフェイス 説明
static void addTable(TableInfo table, JobConf conf) タスク出力にテーブルを追加します。 複数回呼び出すことができます。 同様に、新しく追加されたテーブルを、追加モードで出力キューに追加します。
static void setTables(TableInfo [] tables, JobConf conf) タスク出力に複数のテーブルを追加します。

パイプライン

パイプラインは MR2 のサブジェクトです。Pipeline.builder によって構築できます。 パイプラインは次のとおりです。
    public Builder addMapper(Class<? extends Mapper> mapper)
    public Builder addMapper(Class<? extends Mapper> mapper,
           column [] keyschema, column [] valueschema, string [] sortcols,
           SortOrder [] order, string [] partcols,
            Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder addReducer(Class<? extends Reducer> reducer)
    public Builder addReducer(Class<? extends Reducer> reducer,
           column [] keyschema, column [] valueschema, string [] sortcols,
           SortOrder [] order, string [] partcols,
            Class<? extends Partitioner> theClass, String[] groupCols)
    public setoutputkeyschema builder (Column [] keyschema)
    public setoutputvalueschema builder (Column [] valueschema)
    public setoutputkeysortcolumns builder (String [] sortcols)
    public setoutputkeysortorder builder (Sortorder [] order)
    public setpartitioncolumns builder (String [] partcols)
    public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
    void setOutputGroupingColumns(String[] cols)
例:
    job job = new job ();
    pipeline pipeline = pipeline. builder ()
     . addmapper (Tokenizermapper. class)
     . setoutputkeyschema (
         new column [] {new column ("word", OdpsType. string)})
     . setoutputvalueschema (
         new column [] {new column ("count", OdpsType. bigint)})
     . addreducer (Sumreducer. class)
     . setoutputkeyschema (
         new column [] {new column ("count", OdpsType. bigint)})
     . setoutputvalueschema (
         new column [] {new column ("word", OdpsType. string),
         new column ("count", OdpsType. bigint)})
     . addreducer (Identityreducer. class). createPipeline ();
    job. setpipeline (pipeline);  
    job. addinput (...)
    job. addoutput (...)
    job. submit ();
前述の例で示されるように、ユーザーは main クラスで Map を構築し、 続けて 2 つの Reduce の MapReduce タスクを取得できます。MapReduce の基本的な機能に慣れている場合は、機能が似ているので、MR2 も使用できます。
  • 具体的には、ユーザーが JobConf によって MapReduce タスクの設定を完了することを推奨します。
  • JobConf は Map を設定した後にのみ、単一の Reduce の MapReduce タスクを取得することができます。

データ型

MapReduce でサポートされているデータ型に含まれるのは、 BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME です。 MaxCompute データ型と Java データ型の間の MaxCompute は次のとおりです。
MaxCompute SQL データ型 Java データ型
Bigint Long
String String
Double Double
Boolean Boolean
Datetime Date
Decimal BigDecimal