ジョブの実行

MaxCompute コンソールには、MaxCompute Graph ジョブを実行するための jar コマンドが提供されています。 これらのコマンドは、MapReducejar コマンドを実行するのと同じ方法で使用されます。

ここでは、これらのコマンドを説明します。
Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
                -conf <configuration_file> Specify an application configuration file
                -classpath <local_file_list> classpaths used to run mainClass
                -D <name>=<value> Property value pair, which is used to run mainClass
                -local Run job in local mode
                -resources <resource_name_list> file/table resources used in graph, separated by command
< GENERIC_OPTIONS> には、以下のパラメーターを指定できます (すべてオプション)。
  • -conf < configuration file >: JobConf 設定ファイルを指定します。
  • -classpath < local_file_list >: ローカル実装のクラスパスを示します。 これは主に main 関数を含む jar パッケージを指定するのに使用されます。

    main 関数と Graph ジョブは、通常同じパッケージに記述されます。たとえば、Single Source Shortest Path (SSSP) パッケージなどです。 したがって、サンプルコード内の -resources と -classpath パラメータには、jar パッケージが含まれています。 違いは -resources が Graph ジョブの値を参照し分散環境で実行されるのに対し、-classpath は main 関数を参照しローカル環境で実行されます。 指定された jar パッケージパスはローカルファイルパスでもあります。 パッケージ名はシステムのデフォルトのファイル区切り文字を使用して分割されます。 一般に、区切り文字は Windows システムではセミコロン (;)、Linux システムではカンマ (,) です。

  • -D < prop_name > = < prop_value >: ローカル実装のための < mainClass > の Java 属性を指定します。 複数の属性を定義できます。
  • -local: ローカルモードで Graph ジョブを実行します。主にプログラムのデバッグに使用されます。
  • -resources <resource_name_list >: Graph ジョブ実行のために使用されるリソース命令文を示します。一般に、Graph ジョブが配置されるリソース名は resource_name_list に指定されます。 Graph ジョブ内で他の MaxCompute リソースを読み込む場合、そのリソース名を resource_name_list に追加する必要があります。リソース名はカンマ (,) で区切ります。 プロジェクト間でリソースを使用する場合、PROJECT_NAME/resources/ を前につける必要があります。 たとえば、 -resources otherproject/resources/resfile; とします。
また、Graph ジョブの main 関数を実行すると、MaxCompute コンソールからジョブを送信するのではなく、MaxCompute にジョブが直接送信されます。 次のセクションでは、例として PageRank アルゴリズムを使用します。
public static void main(String[] args) throws Exception {
                    if (args.length < 2)
    printUsage();
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setEndpoint(endPoint);
  odps.setDefaultProject(project);
  SessionState ss = SessionState.get();
  ss.setOdps(odps);
  ss.setLocalRun(false);
  String resource = "mapreduce-examples.jar";
  GraphJob job = new GraphJob();
  // Add the JAR file in use and other files to class cache resource, corresponding to resources specified by -libjars in the JAR command
  job.addCacheResourcesToClassPath(resource);
  job.setGraphLoaderClass(PageRankVertexReader.class);
  job.setVertexClass(PageRankVertex.class);
  job.addInput(TableInfo.builder().tableName(args[0]).build());
  job.addOutput(TableInfo.builder().tableName(args[1]).build());
  // default max iteration is 30
  job.setMaxIteration(30);
  if (args.length >= 3)
    job.setMaxIteration(Integer.parseInt(args[2]));
  long startTime = System.currentTimeMillis();
  job.run();
  System.out.println("Job Finished in "
      + (System.currentTimeMillis() - startTime) / 1000.0
      + " seconds");

入出力

入出力の形式はカスタマイズできません。

次の例はジョブ入力を定義する方法です。 複数の入力がサポートされています。
GraphJob job = new GraphJob();
job.addInput(TableInfo.builder().tableName(“tblname”).build()); //Table as input
job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build()); //Shard as input
//Read-only columns col2 and col0 of the input table. In the load() method of GraphLoader, column col2 is obtained by record.get(0), and the sequence is the same
job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});
  • ジョブ入力の定義の詳細は、GraphJob の addInput() メソッドの説明をご参照ください。 フレームワークは入力テーブルのレコードを読み取り、カスタム GraphLoader へ送信してデータをロードします。
  • 制限事項: 現在、シャードフィルタリング条件はサポートされていません。 詳細については、「アプリケーションの制限」をご参照ください。
次の例はジョブ出力を定義する方法です。 複数のジョブ出力がサポートされています。各出力はラベルでマークされます。
GraphJob job = new GraphJob();
//If the output table is a shard table, the last level of shards must be provided
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
// Parameter true indicates overwriting shards specified by tableinfo, that is, the meaning of INSERT OVERWRITE. Parameter false indicates the meaning of INSERT INTO
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);
  • ジョブ出力の定義の詳細については、GraphJob の addOutput() メソッドの説明をご参照ください。
  • Graph ジョブが実行されると、WorkerContext の write() メソッドによりレコードが出力テーブルに書き込まれます。 前のセクションの "output1" のように複数出力に対してラベルを指定する必要があります。
  • 詳細については、「アプリケーションの制限」をご参照ください。

リソースの読み取り

  • Graph プログラムへリソースを追加する
    jar コマンドに加えて、以下の GraphJob メソッドを使用して Graph が読み取るリソースを指定することができます。
    void addCacheResources(String resourceNames)
    void addCacheResourcesToClassPath(String resourceNames)
  • Graph プログラムでリソースを使用する
    Graph プログラムのリソースを読み取るには、次の手順に従います。
    public byte[] readCacheFile(String resourceName) throws IOException;
                                        public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException;
                                        public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath)throws IOException;
                                        public Iterable<WritableRecord> readResourceTable(String resourceName);
    public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
    public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException;
    public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
    
    • 通常、リソースの読み取りには WorkerComputer の setup() メソッド (Worker Value に保存)、取得には getWorkerValue() メソッドが使用されます。
    • 全体的なメモリ消費を減らすには、前述のストリーム API を使用してリソースの読み取りと処理が同時に実行されるようにします。
    • 詳細については、「アプリケーションの制限」をご参照ください。