このトピックでは、DataWorks のデータ移行機能を使用して、MongoDB から MaxCompute へ JSON フィールドを抽出する方法を設定します。

準備

  1. アカウントを作成します。

    DataWorks にデータソースを追加するには、事前にデータベースにユーザーを作成してください。 この例では、db.createUser({user:"bookuser",pwd:"123456",roles:["root"]}) コマンドを実行し、「bookuser」という名前のユーザーを作成します。 ユーザーのパスワードは 123456、権限は root です。

  2. データを準備します。
    MongoDB へデータをアップロードします。 この例では、Alibaba Cloud ApsaraDB for MongoDB が使用されています。 ネットワークタイプは VPC です。 (MongoDB が DataWorks のデフォルトリソースグループと通信するためには、インターネット IP アドレスが必要です。) テストデータは次のとおりです。
    
    {
        "store": {
            "book": [
                 {
                    "category": "reference",
                    "author": "Nigel Rees",
                    "title": "Sayings of the Century",
                    "price": 8.95
                 },
                 {
                    "category": "fiction",
                    "author": "Evelyn Waugh",
                    "title": "Sword of Honour",
                    "price": 12.99
                 },
                 {
                     "category": "fiction",
                     "author": "J. R. R. Tolkien",
                     "title": "The Lord of the Rings",
                     "isbn": "0-395-19395-8",
                     "price": 22.99
                 }
              ],
              "bicycle": {
                  "color": "red",
                  "price": 19.95
              }
        },
        "expensive": 10
    }
    MongoDB の DMS コンソールにログインします。 この例では、データベース名は admin、コレクションは userlog です。 アップロードされたデータを表示するには、クエリウィンドウでdb.userlog.find().limit(10) コマンドを実行します。

DataWorks を使用した MaxCompute へのデータ抽出

  • 1. MongoDB データソースの追加
    DataWorks コンソールで、[Data Integration] ページへ移動し、[MongoDB] を追加します。
    次の図のようにパラメーターが表示されます。 接続性テストが成功したら、[Complete] をクリックします この例では、MongoDB のネットワークタイプは VPC です。 したがって、[Public IP Address Available][Public IP Address Available] に設定する必要があります。IP アドレスとポート番号を取得するには、 にログインし、目的のインスタンスをクリックします。 次の図のようにパラメーターが表示されます。
  • 2. データ同期タスクの作成
    DataWorks コンソールで、データ同期ノードを作成します。 詳細については、「OTSStream リーダーの設定」をご参照ください。同時に、 JSON データを格納する「mqdata」という名前のテーブルをDataWorks で作成します。 詳しくは、「バケットの作成」をご参照ください。グラフィカルインターフェイスでテーブルパラメータを設定できます。 mqdata テーブルには MQ データという名前の列が 1 つだけあります。 データ型は String 型です。
  • 3. パラメーターを設定します。
    テーブルを作成したら、グラフィカルインターフェイスでデータ同期タスクのパラメーターを設定します。 まず宛先データソースを odps_first に、宛先テーブルを mqdata に設定します。 元のデータソースを MongoDB に設定し、mongodb_userlog を選択します。 上記の設定が終わったら、Switch to script mode をクリックします。 スクリプトモードのコードの例は次のとおりです。
    
    {
        "type": "job",
        "steps": [
            {
                "stepType": "mongodb",
                "parameter": {
                    "datasource": "mongodb_userlog",
     //Data source name
                    "column": [
                        {
                            "name": "store.bicycle.color", //JSON field path. In this example, the value of color is extracted.
                            "type": "document.document.string" //The number of fields in this line must be the same as that in the preceding line (the name line). If the JSON field is a level-1 field, for example, the expensive field in this topic, enter the string.
                        }
                    ],
                    "collectionName //Collection name": "userlog"
                },
                "name":"Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "isCompress": false,
                    "truncate": true,
                    "datasource": "odps_first",
                    "column": [
                              "mqdata" //Table column name in MaxCompute
                    ],
                    "emptyAsNull": false,
                    "table": "mqdata"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false,
                "dmu": 1
            }
        }
    }
    前述の設定が完了したら、[Run] をクリックします。 次の情報が表示されたら、コードは正常に実行されています。

結果の検証

Business Flow で ODPS SQL ノードを作成します。SELECT * from mqdata; 文を入力し、mqdata テーブルのデータを表示します。 SELECT * from mqdata; コマンドを MaxCompute クライアントで実行しても、データを表示できます。