ロングテールは、分散コンピューティングでは一般的かつ難しい問題です。 データが不均等に分散されている場合は、各ノードのワークロードは異なります。 最も遅いノードの作業が完了するまで、タスク全体は終了しません。

この問題に対処するには、1 つの worker に特定して重いタスクを実行するのではなく、タスクを分割し、複数の worker に分散して実行します。 ここでは、ロングテールの典型的なケースに対処する方法を説明します。

JOIN のロングテール

原因

JOIN 文のキーに大量のデータがある場合は、ロングテールが発生します。

解決策

3 つの手順でこの問題を解決します。

  • 一方または両方のテーブルが、サイズの小さいテーブルでないことを確認します。 一方のテーブルのサイズが大きく、もう一方が小さい場合は、MapJoin 文を使用してサイズが小さいテーブルをキャッシュします。 構文と関連事項の説明については、「SELECT 構文の説明」をご参照ください。 ジョブが MapReduce ジョブの場合は、resource table 関数を使用して、サイズが小さいテーブルをキャッシュします。

  • 両方のテーブルのサイズが比較的大きい場合は、重複しているデータをできるだけ減らします。

  • 問題が解決しない場合は、データ量の多い 2 つのキーではデカルト積の計算を避けて、サービスの最適化を検討してください。

  • 小さいテーブルが大きいテーブルを LEFT JOIN で結合すると、odps が直接 LEFT JOIN で結合する速度は低下します。 この時点で、最初に小さいテーブルと大きいテーブルに MapJoin を実行し、小さいテーブルと大きいテーブルの共通のデータを取得します。この中間テーブルは大きいテーブルより大きくなってはなりません (大きなキーの傾斜があまり大きくなければ)。 小さいテーブルがこの中間テーブルと LEFT JOIN を実行するとき、その結果は小さなテーブルと大きなテーブルを LEFT JOIN するのと同じです。

GROUP BY のロングテール

原因

GROUP BY 文のキーに大量の計算がある場合は、ロングテールが発生します。

解決策

次のいずれかの方法でタスクを実行できます。

  • SQL 文を書き換え、乱数を追加して長いキーを分割します。 SQL 文は以下のとおりです。

    SELECT Key,COUNT(*) AS Cnt FROM TableName GROUP BY Key;

    Combiner がスキップされていれば、データは M ノードから R ノードに移動されます。 その後、R ノードは COUNT 操作を実行します。 実行計画は M > R で、ロングテールが発生したキーに作業を再配信する場合は、 SQL 文を以下のとおりに変更します。

    -- Assume that the key with the long tail is KEY001.
    SELECT a.Key
      , SUM(a.Cnt) AS Cnt
    FROM (
      SELECT Key
        , COUNT(*) AS Cnt
    FROM TableName
    GROUP BY Key, 
      CASE 
        WHEN Key = 'KEY001' THEN Hash(Random()) % 50
        ELSE 0
       END
    ) a
    GROUP BY a.Key;

    実行計画が M > R > R に変更されます。実行は長くなりますが、 ロングテールキーが 2 つの手順で処理されるため、消費時間が短くなる可能性があります。

    前述の方法で R 実行手順を追加して、重大ではないロングテールの問題を解決する場合は、消費時間が増加する可能性があります。

  • システムパラメーターを以下のように設定します。

    set odps.sql.groupby.skewindata=true.

    パラメーター設定は一般的な最適化の方法ですが、この方法は特定のサービスを考慮しないため、十分な結果を得られません。 実際のデータに基づいて、より効果的に SQL 文を書き換えます。

Distinct のロングテール

DISTINCT 文でロングテールが発生する場合は、キー分割の方法は適用されません。 他の方法を検討します。

解決策

--Original SQL statement, with the null UID skipped
SELECT COUNT(uid) AS Pv
    , COUNT(DISTINCT uid) AS Uv
FROM UserLog;

SQL 文を以下のように書き換えます。

SELECT SUM(PV) AS Pv
    , COUNT(*) AS UV
FROM (
    SELECT COUNT(*) AS Pv
      , uid
    FROM UserLog
    GROUP BY uid
) a;

DISTINCT 文は COUNT 文に書き換えられ、単一のReducer の計算の負荷が軽減されます。 この解決策は GROUP BY 文の最適化と Combiner の実行も可能で、パフォーマンスが大きく改善されます。

動的パーティションのロングテール

原因

  • 小さなファイルのデータをソートするために、動的パーティション機能では、実行の最終段階で Reduce タスクが開始されます。 動的パーティション機能で書き込まれたデータでデータスキューが発生した場合に、ロングテールが発生します。

  • 動的パーティション機能を誤って使用すると、よくロングテールが発生します。

解決策

データを書き込む対象のパーティションが決定された場合は、動的パーティション機能を使用する代わりに、INSERT 操作中にこのパーティションを指定します。

Combiner を使用したロングテールの除去

MapReduce のジョブについては、Combiner を使用してロングテールを取り除くのが一般的な方法です。 この方法は WordCount の例で説明されています。 Combiner は Mapper から Reducer へのデータ移動量を減らし、ネットワーク転送のオーバーヘッドを大幅に減らします。 最適化は、MaxCompute SQL 文に対して自動的に実行されます。

Combiner は Map の最適化のみをサポートしています。 Combiner の実行結果が 同じであることを確認する必要があります。 たとえば、WordCount では、2 の場合 (KEY,1) と 1 の場合 (KEY,2) で結果は同じです。 たとえば、平均値の計算では、(KEY,1) と (KEY,2) は Combiner 内で (KEY,1.5) に直接マージされません。

システム最適化を使用したロングテールの除去

ローカル Combiner に加えて、MaxCompute によって提供された最適化方法を使用して、ロングテールを取り除きます。 たとえば、下記の内容 (+N 個の バックアップ) はタスク実行プロセス中に記録されます。

M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047[100%] 
M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047[100%] 
M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047(+1 backups)[100%] 
M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047(+1 backups)[100%]

Reducer の総数は 1,047 で、そのうち 1,046 は完了しています。 最後の 1 つは未完了です。 システムがこの状態を確認すると、 同じデータを実行するために新しい Reducer を開始します。 次に、システムは最初に完了した Reducer のデータを取得し、データを結果セットに結合します。

サービスの最適化を使用したロングテールの除去

前述の最適化方法はすべての問題を解決できるわけではありません。 以下の例のように、サービスを分析することで、より良い解決策を探します。

  • 実際にはノイズの多いデータが大量に存在する可能性があります。 たとえば、 visitor ID によって、各ユーザーのアクセスレコードの行動データを確認します。 クローラーの識別が困難になっても、まずクローラーのデータを除外する必要があります。さもなければ、クローラーのデータは、計算処理中に容易にロングテールを発生させます。 同様のケースでは、特定の xxid によってデータを関連付ける場合に、 関連付けのためのフィールドが null かどうかを確認する必要があります。

  • 特殊なサービスがあります。たとえば、ISV 操作レコードはデータ量と動作に関して、一般的なユーザーレコードとは大きく異なります。 特殊な方法を使用して、主要なアカウントの ISV 操作レコードを個別に分析し、操作します。

  • データ分散が不均一な場合は、フルソートの DISTRIBUTE BY フィールドとして定数フィールドを使用しません。