このトピックでは、外部 Elasticsearch インスタンスから Alibaba Cloud Elasticsearch インスタンスにデータを移行し、データのインデックスを再作成する方法について説明します。 外部 Elasticsearch インスタンスは、Elastic Compute Service (ECS) インスタンスで実行されています。

手順

データ移行手順には、次の手順が含まれます。
  1. インデックスの作成
  2. データの移行

このトピックでは、よくある質問とそれに対する解決策についても説明します。 詳細については、「FAQ」をご参照ください。

前提条件

このトピックの手順に従ってデータを移行する前に、次の要件が満たされていることを確認してください。 これらの要件を満たしていない場合は、別のデータ移行計画を選択してください。
  • 外部 Elasticsearch インスタンスをホストする ECS インスタンスは、VPC ネットワークにデプロイされている必要があります。 ClassicLink を介して VPC ネットワークに接続された ECS インスタンスを使用することはできません。 外部 Elasticsearch インスタンスと Alibaba Cloud Elasticsearch インスタンスが、同じ VPC ネットワークに接続されていることを確認する必要があります。
  • このトピックのスクリプトは、中間サーバーで実行できます。 中間サーバーがポート 9200 を介して両方の Elasticsearch インスタンスに接続されていることを確認します。
  • Alibaba Cloud Elasticsearch インスタンスの IP アドレスを、外部 Elasticsearch インスタンスをホストする ECS インスタンスの VPC セキュリティグループに追加し、ポート 9200 を有効にします。
  • Alibaba Cloud Elasticsearch インスタンスのノードの IP アドレスを、外部 Elasticsearch インスタンスをホストする ECS インスタンスの VPC セキュリティグループに追加します。 これにより、ノードは外部 Elasticsearch インスタンスに接続できます。 Alibaba Cloud Elasticsearch インスタンスの Kibana コンソールからノードの IP アドレスをクエリできます。
  • 外部 Elasticsearch インスタンスと Alibaba Cloud Elasticsearch インスタンスは相互接続されています。 curl -XGET http://<host>:9200 コマンドを実行して、中間サーバーで接続をテストできます。

インデックスの作成

外部 Elasticsearch インスタンスの既存のインデックス設定に従って、Alibaba Cloud Elasticsearch インスタンスに新しいインデックスを作成します。 Alibaba Cloud Elasticsearch インスタンスの自動インデックスを有効にして、インデックスとマッピングを自動的に作成することもできます。 ただし、自動インデックスを使用しないことを推奨します。

次の例は、外部 Elasticsearch インスタンスから Alibaba Cloud Elasticsearch インスタンスにインデックスのバッチを移行するための Python スクリプトです。 デフォルトでは、新しいインデックスのレプリカは作成されません。
#! /usr/bin/python
# -*- coding: UTF-8 -*-
# File name: indiceCreate.py
import sys
import base64
import time
import httplib
import json
## The ECS instance that hosts the source Elasticsearch instance (IP address + Port).
oldClusterHost = "old-cluster.com"
# The username of the source Elasticsearch instance. The username field can be left empty.
oldClusterUserName = "old-username"
## The password of the source Elasticsearch instance. The password field can be left empty.
oldClusterPassword = "old-password"
## The ECS instance that hosts the destination Elasticsearch instance (IP address + Port).
newClusterHost = "new-cluster.com"
## The username of the destination Elasticsearch instance. The username field can be left empty.
newClusterUser = "new-username"
## The password of the destination Elasticsearch instance. The password field can be left empty.
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username ! = "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print index + "  The original settings: \n" + indexSettings
    settingsDict = json.loads(indexSettings)
    ## The number of shards equals those of the indexes on the source Elasticsearch instance by default.
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## The default number of replicas is 0.
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print index + "The original mappings: \n" + indexMapping
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print "New index" + newIndexName + "Index settings and mappings: \n" + createstatement
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print "New index" + newIndexName + "Creation result:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print index + "It may be a system index that will not be recreated. You can manually recreate the index as needed."

データの移行

後述のいずれかの方法でデータを移行できます。 移行するデータの量に基づいて適切な方法を選択します。

重要
  • データの整合性を確保するには、データを移行する前にソース Elasticsearch インスタンスへのデータの書き込みを停止する必要があります。 ただし、ソース Elasticsearch インスタンスからデータを読み取ることはできます。 データが移行された後、ターゲット Elasticsearch インスタンスに切り替えて、データを読み書きできます。 ソース Elasticsearch インスタンスへのデータの書き込みを停止しない場合、ターゲット Elasticsearch インスタンスのデータはソース Elasticsearch インスタンスのデータと一致しない可能性があります。
  • 後述のいずれかの方法でデータを移行する場合、IP + Port を使用してソース Elasticsearch インスタンスを接続するには、最初にターゲット Elasticsearch インスタンスの YML 設定を変更する必要があります。 ソースインスタンスの IP アドレスを reindex ホワイトリストに追加します。 例:reindex.remote.whitelist: 1.1.1.1:9200,1.2. *. *:*
  • エンドポイントを使用してソース Elasticsearch インスタンスに接続する場合、 http://host:port/path 形式を使用しないでください。 ドメイン名に path を含めないでください。
  • 少量のデータを移行する
    reindex.sh スクリプトを実行します。
    #! /bin/bash
    # file:reindex.sh
    indexName="The name of the index"
    newClusterUser="The username of the destination Elasticsearch instance"
    Newclusterpass = "The password of the destination Elasticsearch instance"
    newClusterHost="The ECS instance that hosts the destination Elasticsearch instance"
    Oldclusteruser = "The username of the source Elasticsearch instance"
    Oldclusterpass = "The password of the source Elasticsearch instance"
    # The address of the ECS instance that hosts the source Elasticsearch instance must be in this format: [scheme]://[host]:[port]. Example: http://10.37.1.1:9200.
    Oldclusterhost = "The ECS instance that hosts the source Elasticsearch instance"
    curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex? pretty" -H "Content-Type: application/json" -d'{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "match_all": {}
            }
        },
        "dest": {
           "index": "'${indexName}'"
        }
    }'
  • 大量のデータを移行する (削除操作なし、ローリング更新時間ありの場合)
    削除操作なしで大量のデータを移行するには、ローリング更新を使用すると、書き込み操作の中断期間を短縮できます。 ローリング更新では、スクリプトに時系列フィールドを追加して、更新時間を定義する必要があります。 ソース Elasticsearch インスタンスからターゲット Elasticsearch インスタンスにデータを移行した後、ソース Elasticsearch インスタンスへのデータの書き込みを停止できます。 次に、ローリング更新を使用して、データ移行中に更新されたデータを同期します。 ローリング更新が完了したら、ターゲット Elasticsearch インスタンスに切り替え、読み取り操作と書き込み操作を再開します。
    #! /bin/bash
    # file: circleReindex.sh
    # CONTROLLING STARTUP:
    # This is a script that uses the Reindex operation to remotely reindex the data. Requirements:
    #1. You have created indexes on the destination Elasticsearch instance, or the destination instance supports auto-indexing and dynamic mapping.
    # 2. You must configure the IP whitelist in the YML configuration of the destination Elasticsearch instance: reindex.remote.whitelist: 172.16.123. *:9200
    #3. The specified ECS instance address must be in the following format: [scheme]://[host]:[port].
    USAGE="Usage: sh circleReindex.sh <count>
           count: the number of times to perform the reindex operation. A negative number indicates loop execution. You can set this parameter to perform the reindex operation only once or multiple times.
    Example:
            sh circleReindex.sh 1
            sh circleReindex.sh 5
            sh circleReindex.sh -1"
    indexName="The name of the index"
    newClusterUser="The username of the destination Elasticsearch instance"
    newClusterPass="The password of the destination Elasticsearch instance"
    oldClusterUser="The username of the source Elasticsearch instance"
    oldClusterPass="The password of the source Elasticsearch instance"
    ## http://myescluster.com
    newClusterHost="The ECS instance that hosts the destination Elasticsearch instance"
    # You need to specify address of the ECS instance that hosts the source Elasticsearch instance in the following format: [scheme]://[host]:[port]. Example: http://10.37.1.1:9200
    oldClusterHost="The ECS instance that hosts the source Elasticsearch instance"
    timeField="The field that specifies the time window during which the incremental data is migrated"
    reindexTimes=0
    lastTimestamp=0
    curTimestamp=`date +%s`
    hasError=false
    function reIndexOP() {
        reindexTimes=$[${reindexTimes} + 1]
        curTimestamp=`date +%s`
        ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex? pretty" -H "Content-Type: application/json" -d '{
            "source": {
                "remote": {
                    "host": "'${oldClusterHost}'",
                    "username": "'${oldClusterUser}'",
                    "password": "'${oldClusterPass}'"
                },
                "index": "'${indexName}'",
                "query": {
                    "range" : {
                        "'${timeField}'" : {
                            "gte" : '${lastTimestamp}',
                            "lt" : '${curTimestamp}'
                        }
                    }
                }
            },
            "dest": {
                "index": "'${indexName}'"
            }
        }'`
        lastTimestamp=${curTimestamp}
        echo "${reindexTimes} reindex operations have been performed. The last reindex operation is completed at ${lastTimestamp} Result: ${ret}"
        if [[ ${ret} == *error* ]]; then
            hasError=true
            echo "An unknown error occurred while performing this operation. All subsequent operations have been suspended."
        fi
    }
    function start() {
        ## A negative number indicates loop execution.
        if [[ $1 -lt 0 ]]; then
            while :
            do
                reIndexOP
            done
        elif [[ $1 -gt 0 ]]; then
            k=0
            while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
                reIndexOP
                let ++k
            done
        fi
    }
    ## main 
    if [ $# -lt 1 ]; then
        echo "$USAGE"
        exit 1
    fi
    echo "Start the reindex operation for index ${indexName}"
    start $1
    echo "You have performed ${reindexTimes} reindex operations"
  • 大量のデータを移行する (削除操作またはローリング更新時間なしの場合)
    大量のデータを移行する必要があり、マッピングに更新時間フィールドが定義されていない場合、ソースインスタンスのワークロードのスクリプトに更新時間フィールドを追加する必要があります。 フィールドを追加した後、既存のデータを移行してから、前述のデータ移行計画で説明されているローリング更新を使用して増分データを移行できます。
    #! /bin/bash
    # file:miss.sh
    indexName="The name of the index"
    newClusterUser="The username of the destination Elasticsearch instance"
    Newclusterpass = "The password of the destination Elasticsearch instance"
    newClusterHost="The ECS instance that hosts the destination Elasticsearch instance"
    Oldclusteruser = "The username of the source Elasticsearch instance"
    Oldclusterpass = "The password of the source Elasticsearch instance"
    # The address of the ECS instance that hosts the source Elasticsearch instance must be in this format: [scheme]://[host]:[port]. Example: http://10.37.1.1:9200.
    oldClusterHost="The ECS instance that hosts the source Elasticsearch instance"
    timeField="updatetime"
    curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex? pretty" -H "Content-Type: application/json" -d '{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "bool": {
                    "must_not": {
                        "exists": {
                            "field": "'${timeField}'"
                        }
                    }
                }
            }
        },
        "dest": {
           "index": "'${indexName}'"
        }
    }'
  • 書き込み操作を停止せずにデータを移行する

    このデータ移行計画は今後提供される予定です。

FAQ

  • 問題:curl コマンドを実行すると、{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406} が表示される。
    解決策:-H "Content-Type: application/json" パラメーターを curl コマンドに追加し、もう一度お試しください。
    // Obtain all the indexes on the source instance. If you do not have the required permissions, remove the "-u user:pass" parameter. Make sure that you have replaced oldClusterHost with the name of the ECS instance that hosts the source Elasticsearch instance.
      curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
      // Based on the returned indexes, obtain the setting and mapping of the index that you need to migrate for the specified user. Make sure that you have replaced indexName with the index name that you need to query.
      curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
      // Create a new index on the destination Elasticsearch instance according to the _settings and _mapping settings that you have obtained from the preceding step. You can set the number of index replicas to 0 to accelerate the data synchronization process, and change the number of replicas to one after the migration is complete.
      // ewClusterHost indicates the ECS instance that hosts the destination Elasticsearch instance, testindex indicates the name of the index that you have created, and testtype indicates the type of the index.
      curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
        "testindex" : {
            "settings" : {
                "number_of_shards" : "5", //Specify the number of shards for the corresponding index on the source Elasticsearch instance, for example, 5
                "number_of_replicas" : "0" //Set the number of index replicas to zero
              }
            },
            "mappings" : { //Set the mapping for the index on the source Elasticsearch instance. For example, you can set the mapping as follows
                "testtype" : {
                    "properties" : {
                        "uid" : {
                            "type" : "long"
                        },
                        "name" : {
                            "type" : "text"
                        },
                        "create_time" : {
                          "type" : "long"
                        }
                    }
               }
           }
       }
    }'
  • 問題:データ移行プロセスが遅すぎる。
    解決策:インデックスが大きすぎる場合、移行前にレプリカの数を 0 に、更新間隔を -1 に設定します。 データの移行後、レプリカと更新の設定を元の値に戻します。 これにより、同期プロセスが高速化されます。
    // You can set the number of index replicas to 0 and disable refresh to accelerate the migration process.
    curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
            "number_of_replicas" : 0,
            "refresh_interval" : "-1"
    }'
    // After the data is migrated, set the number of index replicas to 1 and the refresh interval to 1s (default value, which means 1 second).
    curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
            "number_of_replicas" : 1,
            "refresh_interval" : "1s"
    }'
このトピックのコンテンツの一部は、『Reindex API』から引用しています。