このページでは、E-MapReduce クラスター上で MapReduce ジョブを作成して実行する方法について説明します。

MapReduce での OSS 利用

MapReduce で OSS に対してデータを読み書きするには、以下のパラメータを設定します。
conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");
パラメーター説明 :
  • ${accessKeyId}: お客様のアカウントの AccessKey ID

  • ${accessKeySecret}: AccessKey ID に対応する AccessKey Secret

  • ${endpoint}: OSS へのアクセスに使用するネットワーク。 クラスターが存在するリージョンによって異なり、対応する OSS もクラスターが存在するリージョンにある必要があります。

    特定の値の詳細は、 「OSS エンドポイント (OSS endpoints)」をご参照ください。

ワードカウント

以下の例では、OSS からテキストを読み取り、ワードカウントを計算する方法について説明します。 手順は以下のとおりです。

  1. プログラムの記述

    Java コードを例とします。 Hadoop の公式 Web サイトの WordCount サンプルを以下のように変更します。コードに AccessKey ID と AccessKey Secret の設定を追加して、ジョブが OSS ファイルにアクセスする権限を持つようにインスタンスを変更します。

    package org.apache.hadoop.examples;
     import java.io.IOException;
     import java.util.StringTokenizer;
     import org.apache.hadoop.conf.Configuration;
     import org.apache.hadoop.fs.Path;
     import org.apache.hadoop.io.IntWritable;
     import org.apache.hadoop.io.Text;
     import org.apache.hadoop.mapreduce.Job;
     import org.apache.hadoop.mapreduce.Mapper;
     import org.apache.hadoop.mapreduce.Reducer;
     import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     import org.apache.hadoop.util.GenericOptionsParser;
     public class EmrWordCount {
      public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{
         private final static IntWritable one = new IntWritable(1);
         private Text word = new Text();
         public void map(Object key, Text value, Context context
                         ) throws IOException, InterruptedException {
           StringTokenizer itr = new StringTokenizer(value.toString());
           while (itr.hasMoreTokens()) {
             word.set(itr.nextToken());
             context.write(word, one);
           }
         }
       }
       public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
         private IntWritable result = new IntWritable();
          public void reduce(Text key, Iterable<IntWritable> values,
                            Context context
                            ) throws IOException, InterruptedException {
           int sum = 0;
           for (IntWritable val : values) {
             sum += val.get();
           }
           result.set(sum);
           context.write(key, result);
         }
       }
       public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
         if (otherArgs.length < 2) {
           System.err.println("Usage: wordcount <in> [<in>...] <out>");
           System.exit(2);
         }
         conf.set("fs.oss.accessKeyId", "${accessKeyId}");
         conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
         conf.set("fs.oss.endpoint","${endpoint}");
         Job job = Job.getInstance(conf, "word count");
         job.setJarByClass(EmrWordCount.class);
         job.setMapperClass(TokenizerMapper.class);
         job.setCombinerClass(IntSumReducer.class);
         job.setReducerClass(IntSumReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);
         for (int i = 0; i < otherArgs.length - 1; ++i) {
           FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
         }
         FileOutputFormat.setOutputPath(job,
           new Path(otherArgs[otherArgs.length - 1]));
         System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
      }
  2. プログラムのコンパイル

    まず、JDK 環境と Hadoop 環境を設定してから、以下の操作を実行する必要があります。

    mkdir wordcount_classes
     javac -classpath ${HADOOP_HOME}/share/hadoop/common/hadoop-common-2.6.0.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/commons-cli-1.2.jar -d wordcount_classes EmrWordCount.java
     jar cvf wordcount.jar -C wordcount_classes .
  3. ジョブの作成
    • 前のステップで準備した JAR ファイルを OSS にアップロードします。 詳細な操作は、OSS Web サイトにログインしてご確認ください。 OSS 上の JAR ファイルのパスが oss://emr/jars/wordcount.jar、入力パスと出力パスは oss://emr/data/WordCount/Input および oss://emr/data/WordCount/Output であるとします。
    • 以下のとおり E-MapReduce ジョブを作成します。
  4. 実行プランの作成

    E-MapReduce で実行プランを作成し、作成したジョブを実行プランに追加します。 ポリシーとして [今すぐ実行] を選択し、WordCount ジョブが選択したクラスターで実行されるようにします。

Maven を使用して MR ジョブを管理

プロジェクトのサイズが大きくなると、管理はかなり複雑になります。 プロジェクトを管理するには、Maven または同様のソフトウェア管理ツールを使用するよう推奨します。 手順は以下のとおりです。

  1. Maven のインストール

    まず、Maven がインストール済みであることを確認します。

  2. プロジェクトフレームワークを生成します。

    プロジェクトのルートディレクトリ (プロジェクトのルートディレクトリを D:/workspace と仮定) で、以下のコマンドを実行します。

    mvn archetype:generate -DgroupId=com.aliyun.emr.hadoop.examples -DartifactId=wordcountv2 -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

    Maven は自動的に空のサンプルプロジェクトを基本の pom.xml ファイルと App クラス (クラスパッケージパスは指定済みの groupId と同じ) を含む D:/workspace/wordcountv2 (指定済みの artifactId と同じ) に生成します。

  3. Hadoop 依存関係の追加

    お気に入りの IDE でプロジェクトを開き、pom.xml ファイルを編集します。 以下の内容を依存関係に追加します。

    <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-common</artifactId>
             <version>2.6.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>2.6.0</version>
         </dependency>
  4. プログラムの記述

    com.aliyun.emr.hadoop.examples パッケージの下の App クラスと同じディレクトリレベルに WordCount2.java という名前の新しいクラスを追加します。 コンテンツは以下のとおりです。

    package com.aliyun.emr.hadoop.examples;
     import java.io.BufferedReader;
     import java.io.FileReader;
     import java.io.IOException;
     import java.net.URI;
     import java.util.ArrayList;
     import java.util.HashSet;
     import java.util.List;
     import java.util.Set;
     import java.util.StringTokenizer;
     import org.apache.hadoop.conf.Configuration;
     import org.apache.hadoop.fs.Path;
     import org.apache.hadoop.io.IntWritable;
     import org.apache.hadoop.io.Text;
     import org.apache.hadoop.mapreduce.Job;
     import org.apache.hadoop.mapreduce.Mapper;
     import org.apache.hadoop.mapreduce.Reducer;
     import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     import org.apache.hadoop.mapreduce.Counter;
     import org.apache.hadoop.util.GenericOptionsParser;
     import org.apache.hadoop.util.StringUtils;
     public class WordCount2 {
         public static class TokenizerMapper
                 extends Mapper<Object, Text, Text, IntWritable>{
             static enum CountersEnum { INPUT_WORDS }
             private final static IntWritable one = new IntWritable(1);
             private Text word = new Text();
             private boolean caseSensitive;
             private Set<String> patternsToSkip = new HashSet<String>();
             private Configuration conf;
             private BufferedReader fis;
             @Override
             public void setup(Context context) throws IOException,
                     InterruptedException {
                 conf = context.getConfiguration();
                 caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
                 if (conf.getBoolean("wordcount.skip.patterns", true)) {
                     URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                     for (URI patternsURI : patternsURIs) {
                         Path patternsPath = new Path(patternsURI.getPath());
                         String patternsFileName = patternsPath.getName().toString();
                         parseSkipFile(patternsFileName);
                     }
                 }
             }
             private void parseSkipFile(String fileName) {
                 try {
                     fis = new BufferedReader(new FileReader(fileName));
                     String pattern = null;
                     while ((pattern = fis.readLine()) ! = null) {
                         patternsToSkip.add(pattern);
                     }
                 } catch (IOException ioe) {
                     System.err.println("Caught exception while parsing the cached file '"
                             + StringUtils.stringifyException(ioe));
                 }
             }
             @Override
             public void map(Object key, Text value, Context context
             ) throws IOException, InterruptedException {
                 String line = (caseSensitive) ?
                         value.toString() : value.toString().toLowerCase();
                 for (String pattern : patternsToSkip) {
                     line = line.replaceAll(pattern, "");
                 }
                 StringTokenizer itr = new StringTokenizer(line);
                 while (itr.hasMoreTokens()) {
                     word.set(itr.nextToken());
                     context.write(word, one);
                     Counter counter = context.getCounter(CountersEnum.class.getName(),
                             CountersEnum.INPUT_WORDS.toString());
                     counter.increment(1);
                 }
             }
         }
         public static class IntSumReducer
                 extends Reducer<Text,IntWritable,Text,IntWritable> {
             private IntWritable result = new IntWritable();
             public void reduce(Text key, Iterable<IntWritable> values,
                                Context context
             ) throws IOException, InterruptedException {
                 int sum = 0;
                 for (IntWritable val : values) {
                     sum += val.get();
                 }
                 result.set(sum);
                 context.write(key, result);
             }
         }
         public static void main(String[] args) throws Exception {
             Configuration conf = new Configuration();
             conf.set("fs.oss.accessKeyId", "${accessKeyId}");
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
             conf.set("fs.oss.endpoint","${endpoint}");
             GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
             String[] remainingArgs = optionParser.getRemainingArgs();
             if (!( remainingArgs.length ! = 2 || remainingArgs.length ! = 4)) {
                 System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
                 System.exit(2);
             }
             Job job = Job.getInstance(conf, "word count");
             job.setJarByClass(WordCount2.class);
             job.setMapperClass(TokenizerMapper.class);
             job.setCombinerClass(IntSumReducer.class);
             job.setReducerClass(IntSumReducer.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(IntWritable.class);
             List<String> otherArgs = new ArrayList<String>();
             for (int i=0; i < remainingArgs.length; ++i) {
                 if ("-skip".equals(remainingArgs[i])) {
                     job.addCacheFile(new Path(EMapReduceOSSUtil.buildOSSCompleteUri(remainingArgs[++i], conf)).toUri());
                     job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
                 } else {
                     otherArgs.add(remainingArgs[i]);
                 }
             }
             FileInputFormat.addInputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(0), conf)));
             FileOutputFormat.setOutputPath(job, new Path(EMapReduceOSSUtil.buildOSSCompleteUri(otherArgs.get(1), conf)));
             System.exit(job.waitForCompletion(true) ? 0 : 1);
         }
     }
    同じ WordCount2 のディレクトリにある EMapReduceOSSUtil クラスの以下のサンプルコードを参照します。
    package com.aliyun.emr.hadoop.examples;
     import org.apache.hadoop.conf.Configuration;
     public class EMapReduceOSSUtil {
         private static String SCHEMA = "oss://";
         private static String AKSEP = ":";
         private static String BKTSEP = "@";
         private static String EPSEP = ".";
         private static String HTTP_HEADER = "http://";
         /**
          * complete OSS uri
          * convert uri like: oss://bucket/path  to  oss://accessKeyId:accessKeySecret@bucket.endpoint/path
          * ossref do not need this
          *
          * @param oriUri original OSS uri
          */
         public static String buildOSSCompleteUri(String oriUri, String akId, String akSecret, String endpoint) {
             if (akId == null) {
                 System.err.println("miss accessKeyId");
                 return oriUri;
             }
             if (akSecret == null) {
                 System.err.println("miss accessKeySecret");
                 return oriUri;
             }
             if (endpoint == null) {
                 System.err.println("miss endpoint");
                 return oriUri;
             }
             int index = oriUri.indexOf(SCHEMA);
             if (index == -1 || index ! = 0) {
                 return oriUri;
             }
             int bucketIndex = index + SCHEMA.length();
             int pathIndex = oriUri.indexOf("/", bucketIndex);
             String bucket = null;
             if (pathIndex == -1) {
                 bucket = oriUri.substring(bucketIndex);
             } else {
                 bucket = oriUri.substring(bucketIndex, pathIndex);
             }
             StringBuilder retUri = new StringBuilder();
             retUri.append(SCHEMA)
                     .append(akId)
                     .append(AKSEP)
                     .append(akSecret)
                     .append(BKTSEP)
                     .append(bucket)
                     .append(EPSEP)
                     .append(stripHttp(endpoint));
             if (pathIndex > 0) {
                 retUri.append(oriUri.substring(pathIndex));
             }
             return retUri.toString();
         }
         public static String buildOSSCompleteUri(String oriUri, Configuration conf) {
             return buildOSSCompleteUri(oriUri, conf.get("fs.oss.accessKeyId"), conf.get("fs.oss.accessKeySecret"), conf.get("fs.oss.endpoint"));
         }
         private static String stripHttp(String endpoint) {
             if (endpoint.startsWith(HTTP_HEADER)) {
                 return endpoint.substring(HTTP_HEADER.length());
             }
             return endpoint;
         }
     }
  5. コードコンパイル、パッケージ化、およびアップロード

    プロジェクトディレクトリで、以下のコマンドを実行します。

    mvn clean package -DskipTests

    プロジェクトディレクトリのターゲットディレクトリに、ジョブの JAR パッケージである wordcountv2-1.0-SNAPSHOT.jar ファイルが表示されます。 JAR パッケージを OSS にアップロードします。

  6. ジョブの作成

    以下のパラメーターを使用して E-MapReduce に新しいジョブを作成します。

    jar ossref://yourBucket/yourPath/wordcountv2-1.0-SNAPSHOT.jar com.aliyun.emr.hadoop.examples.WordCount2 -Dwordcount.case.sensitive=true oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt oss://yourBucket/yourPath/output -skip oss://yourBucket/yourPath/patterns.txt

    ここで、yourBucket は OSS バケットを表し、yourPath はバケット内のパスを表します。 必要に応じて設定します。 関連するリソースを処理するファイル、oss://yourBucket/yourPath/The_Sorrows_of_Young_Werther.txt および oss://yourBucket/yourPath/patterns.txt をダウンロード後、OSS に格納する必要があります。 ジョブに必要なリソースをダウンロードし、それらのリソースを OSS 内の対応するディレクトリに格納できます。

    ダウンロードするファイル: The_Sorrows_of_Young_Werther.txtpatterns.txt
  7. 実行プランの作成および実行

    E-MapReduce で実行プランを作成し、それをジョブと関連付けてから実行プランを実行します。