ここでは、Eclipse を使用して MapReduce プログラムを開発および実行する方法を説明します。

MaxCompute プロジェクトから WordCount の例を選択します。
[WordCount.java] を右クリックし、[Run As]、[ODPS MapReduce] の順に選択します。

ダイアログボックスが表示されたら、[example_project] を選択し、[Finish] をクリックします。

実行が完了すると、次の結果が表示されます。

ユーザー定義の MapReduce プログラムの実行

src ディレクトリを右クリックします。 [New] > [Mapper] の順に選択します。
Mapper を選択すると、次のダイアログボックスが表示されます。 Mapper クラスの名前を入力し、[Finish] をクリックします。
UserMapper.java ファイルが Package Explorer の src ディレクトリに生成されます。 このファイルは Mapper クラスのテンプレートです。
package odps;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class UserMapper extends MapperBase {
    @Override
    public void setup(TaskContext context) throws IOException {
    
    @Override
    public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
    
    @Override
    public void cleanup(TaskContext context) throws IOException {
    

テンプレートでは、設定されたパッケージ名のデフォルトは odps になります。 実際の要件に応じて変更します。 テンプレートの内容を次のように記述します。

package odps;
import java.io.IOException;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class UserMapper extends MapperBase {
    Record word;
    Record one;
    Counter gCnt;
    @Override
    public void setup(TaskContext context) throws IOException {
          word = context.createMapOutputKeyRecord();
          one = context.createMapOutputValueRecord();
          one.set(new Object[] { 1L });
          gCnt = context.getCounter("MyCounters", "global_counts");
    
    @Override
    public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
          for (int i = 0; i < record.getColumnCount(); i++) {
              String[] words = record.get(i).toString().split("\\s+");
              for (String w : words) {
                word.set(new Object[] { w });
                Counter cnt = context.getCounter("MyCounters", "map_outputs");
                cnt.increment(1);
                gCnt.increment(1);
                context.write(word, one);
              
            
          
    @Override
    public void cleanup(TaskContext context) throws IOException {
    

同様に、 src ディレクトリを右クリックし、[New] > [Reduce] の順に選択します。

Reduce クラスの名前を入力します。 この例では、クラス名として UserReduce を使用します。

Package Explorer では、ファイル名 UserReduce.java が src ディレクトリに生成されます。 このファイルの内容は、Reduce クラスのテンプレートです。 テンプレートを以下のように編集します。

package odps;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;
public class UserReduce extends ReducerBase {
    private Record result;
    Counter gCnt;
    @Override
    public void setup(TaskContext context) throws IOException {
          result = context.createOutputRecord();
          gCnt = context.getCounter("MyCounters", "global_counts");
    
    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
          long count = 0;
          while (values.hasNext()) {
            Record val = values.next();
            count += (Long) val.get(0);
          
          result.set(0, key.get(0));
          result.set(1, count);
          Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
          cnt.increment(1);
          gCnt.increment(1);
          context.write(result);
        
    @Override
    public void cleanup(TaskContext context) throws IOException {
    
main 関数の作成: src を右クリックし、[New] > [MapReduce Driver] の順に選択します。 ドライバ名 (この例では名前として "UserDriver" を使用)、Mapper および Reduce (この例では対応する名前として "UserMapper" と "UserReduce" を使用) を入力し、[Finish] をクリックします。 MyDriver.java ファイルも、src ディレクトリにも表示されます。
ドライバの内容を以下のように編集します。
package odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
import com.aliyun.odps.examples.mr.WordCount.SumReducer;
import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class UserDriver {
    public static void main(String[] args) throws OdpsException {
        JobConf job = new JobConf();
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(SumReducer.class);
        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
        InputUtils.addTable(
            TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
        InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
        RunningJob rj = JobClient.runJob(job);
        rj.waitForCompletion();
    
MapReduce プログラムを実行します。 [UserDriver.java] を右クリックし、[Run As] > [ODPS MapReduce] の順に選択すると、次のダイアログボックスが表示されます。
MaxCompute プロジェクトとして [example_project] を選択し、[Finish] をクリックすると、MapReduce プログラムがローカルで実行されます。
上図と同じように出力された場合、ローカル操作が正常に実行されたことを示します。 出力結果は warehouse ディレクトリに保存されます。 MaxCompute プロジェクトを更新します。

wc_out は出力ディレクトリ、R_000000 は結果ファイルです。 ローカルデバッグにより、結果が正しいことを確認し、Eclipse のエクスポート機能を使用して MapReduce プログラムをパッケージ化します。 パッケージ化した後、jar パッケージを MaxCompute にアップロードします。 分散環境で MapReduce を実行する方法については、「クイックスタート」をご参照ください。

ローカルデバッグの完了後、分散環境用に提供されている Eclipse Export 機能を使用して、コードを jar パッケージにパッケージ化します。 この例では、パッケージ名は mr-examples.jar です。 src ディレクトリを選択し、[Export] をクリックします。
エクスポートモードとして Jar File を選択します。
src でパッケージのみをエクスポートする必要があります。 jar ファイルの名前を mr-examples.jar に指定する必要があります。

[Next] をクリックして jar ファイルをエクスポートします。

ローカルで新しいプロジェクトの作成をシミュレートする場合は、warehouse ディレクトリにサブディレクトリ (example_project と同じ階層) を作成します。 ディレクトリの階層構造は次のように表示されます。
<warehouse>
   |____example_project(Project Dirctory)
          |____ <__tables__>
          | |__table_name1(non-partition table)
          | | |____ data(File)
          
          | | |____ <__schema__> (File)
          
          | |__table_name2(Partition Table)
          | |____ partition_name=partition_value(partition directory)
          | | |____ data(file)
          
          | |____ <__schema__> (file)
          
          |____ <__resources__>
                  
                  |___table_resource_name (table resource)
                  | |____<__ref__>
                  
                  |___ file_resource_name(file resource)

schemaの例

Non-partiton table:
project=project_name
table=table_name
columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
Partition table:
project=project_name
table=table_name
columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
Note:
Currently, the following five data formats are supported: bigint,double,boolean,datetime,string, which correspond to the data types in java: -long,double,boolean,java.util.Date,java.lang.String.
データの例
1,1.1,true,2015-06-04 11:22:42 896,hello world
\N,\N,\N,\N,\N
Note: 
The time format is accurate to the millisecond level and all types are represented NULL by '\N'.
  • MapReduce プログラムがローカルで実行されている場合、デフォルトでは対応するテーブルまたはリソースが warehouse ディレクトリから検索されます。 テーブルまたはリソースが存在しない場合は、対応するデータがサーバーからダウンロードされ、warehouse に保存されます。 その後、ローカルで MapReduce を実行します。
  • MapReduce の実行が終了すると、warehouse のディレクトリが更新され、生成された結果が表示されます。