It is well known that when irresistible problems (such as power failure, network disconnection, etc.) occur in a single computer room, it will lead to the failure to provide services normally and cause potential losses to the business. Therefore, in the field of collaborative office, a high-availability design that can be based on the multi-active mechanism in the same city or in different places can minimize the potential high-availability problems caused by only a single point of availability in the computer room while ensuring data consistency. It is particularly important to ensure the user experience of the business and reduce the potential loss of the business caused by single-point problems.

Active-active in the same city is of great significance and value to the high availability guarantee of production, which is indescribable. On the surface, dual-active in the same city is just a simple deployment of a production environment, but in terms of architecture, the impact of this change is huge, such as high-availability management of stateless applications, management of request traffic, management of version releases, and network architecture. Management, etc., the increased architectural complexity is huge.

Combining real collaborative office products: Beijing Office (a comprehensive platform that provides collaborative office services for the Beijing Municipal Government) with complex government affairs networks in the production environment and the case of the evolution of Beijing Office’s intra-city dual-active structure, I will introduce the continuous improvement of Beijing Office , Summary of some thinking and practical experience in the staged evolution process. This article only introduces and summarizes the solutions and experiences of ES clusters in the process of cross-computer room synchronization.

Deploy Logstash on the Kingsoft Cloud computer room, Logstash starts multiple instances (classified by different types to improve synchronization efficiency), and is in the same VPC as the ES cluster of Kingsoft Cloud computer room

Logstash needs to configure large network access rights to ensure that Logstash and ES original clusters and target clusters communicate with each other.

Data migration can be full migration or incremental migration. The first migration is a full migration and subsequent incremental data selection incremental migration.

Incremental migration needs to transform and increase the identity of the incremental data identified, and the specific method will be introduced later.

How Logstash works

Logstash is divided into three parts input, filter, ouput:

The input process receives data, and the data can come from ES, log files, kafka and other channels.

The filter filters and cleans the data.

ouput outputs data to the target device, which can be output to ES, kafka, files, etc.

Principle of Incremental Synchronization

1. For the data at time T, first use Logstash to migrate all the data before T to JD Cloud ES in the Youfu computer room, assuming that it takes ∆T

2. For the incremental data from T to T+∆T, use logstash again to import the data to the ES cluster of JD Cloud in Youfu computer room

3. Repeat the above step 2 until ∆T is small enough, then switch the business to HUAWEI CLOUD, and finally complete the migration of the newly added data

Scope of application: ES data has timestamps or other labels that can distinguish old and new data

Create ECS and install JDK to ignore, just install it yourself

Download the corresponding version of Logstash, try to choose the same version as Elasticsearch, or close to the version to install

1) Download the source code and directly decompress the installation package, ready to use out of the box

2) Modify the memory usage. The default heap memory of logstash is 1G. Selecting the appropriate memory according to the ECS cluster can speed up the migration efficiency of cluster data.

3. Migrate Index

Logstash will help users create indexes automatically, but the automatically created indexes will be slightly different from the user’s own indexes, resulting in inconsistent search formats of final data. Generally, indexes need to be created manually to ensure that the index data is completely consistent.

The python script for index creation is provided below, and users can use this script to create the required index.

The file is a python script for synchronous indexing, and config.yaml is a cluster address configuration file.

Note:To use this script, you need to install related dependencies

yum install -y PyYAML
yum install -y python-requests

Copy the following code and save it as

import yaml
import requests
import json
import getopt
import sys

def help():
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    python -c config.yaml 
def process_mapping(index_mapping, dest_index):
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias in list(aliases.keys()):
        if alias == dest_index:
                "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro = {"opendistro": {"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers = {'Content-Type': 'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code != 200:
            "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
        with open(source_index + ".json", "w") as f:
            json.dump(mapping, f)
def main():
    config_yaml = "config.yaml"
    opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
    for opt_name, opt_value in opts:
        if opt_name in ('-h', '--help'):
        if opt_name in ('-c', '--config'):
            config_yaml = opt_value

    config_file = open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth = None
    if source_user != "":
        source_auth = (source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth = None
    if dest_user != "":
        dest_auth = (dest_user, dest_passwd)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index" + source_index + ", target index: " + dest_index)
            source_url = source + "/" + source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    response.status_code) + " response is " + response.text)
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index " + source_index + " to target index " + dest_index + " successed.")
        # get all indices
        response = requests.get(source + "/_alias", auth=source_auth)
        if response.status_code != 200:
            print("*** get all index failed. resp statusCode:" + str(
                response.status_code) + " response is " + response.text)
        all_index = response.json()
        for index in list(all_index.keys()):
            if "." in index:
            print("start to process source index" + index)
            source_url = source + "/" + index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    index_response.status_code) + " response is " + index_response.text)
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index " + index + " to target index " + dest_index + " successed.")

if __name__ == '__main__':

The configuration file is saved as config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只处理这个文件中mapping地址的索引
# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)
# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
# 如果匹配不到,则使用源端原始的索引名称
only_mapping: true

# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
    source_index: dest_index

The above code and configuration files are ready, and the index synchronization can be completed by directly executing python

After the index synchronization is complete, you can view it on kibana of the target cluster or execute curl to view the index migration status:

GET _cat/indices?v

Logstash configuration is located in the config directory.

Users can refer to the configuration to modify the Logstash configuration file. In order to ensure the accuracy of the migrated data, it is generally recommended to establish multiple groups of Logstash, migrate data in batches, and migrate part of the data for each Logstash.

Configuration reference for inter-cluster migration configuration:

        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        # 需要迁移的索引列表,以逗号分隔,支持通配符
        index => "a_*,b_*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
        slices => 10
        size => 2000
        scroll => "60m"

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]

        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
 # 目的端索引名称,以下配置为和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置为和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false

    # 调试信息,正式迁移去掉
    stdout { codec => rubydebug { metadata => true }}


1. @timestamp is deprecated after elasticsearch2.0.0beta

2. The migration of the Beijing Office from Kingsoft Cloud Data Center to Jingdong Youfu Data Center involves many business areas, and the timestamp fields of the new records represented by each business line are not uniform, and the compatibility workload involved is large , so consider adding a unified incremental tag field: gmt_created_at for preprocessing through the preprocessing function pipeline in elasticsearch to reduce the complexity of the migration work (each business line can evaluate whether this step is needed by itself).

PUT _ingest/pipeline/gmt_created_at
  "description": "Adds gmt_created_at timestamp to documents",
  "processors": [
      "set": {
        "field": "_source.gmt_created_at",
        "value": "{{_ingest.timestamp}}"

3. Check whether the pipeline is effective

GET _ingest/pipeline/*

4. Each index setting corresponds to the settings to increase the pipeline as the default preprocessing

PUT index_xxxx/_settings
  "settings": {
    "index.default_pipeline": "gmt_created_at"

5. Check whether the new settings take effect

GET index_xxxx/_settings

Incremental migration script


index: the way wildcards can be used

query: DSL for incremental synchronization, unify gmt_create_at as a special mark for incremental synchronization

schedule: Synchronize one every minute, “* * * * *”

input {
elasticsearch {
        hosts =>  ["ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        index => "index_*"
        query => '{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size => 5000
        scroll => "5m"
        docinfo => true
        schedule => "* * * * *"
filter {
     mutate {
      remove_field => ["source", "@version"]
output {
    elasticsearch {
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false

# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata => true }}


There is a field of join parent-child type in the mapping, and a 400 exception is reported for direct migration

[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}


Combined with the business characteristics, by adding a small amount of ruby ​​code to the filter, the value of _routing is taken out and put back into the logstah event, so the problem is solved.


#Crosscomputer #room #synchronization #practice #Cloud #developers #personal #space #News Fast Delivery

Leave a Comment

Your email address will not be published. Required fields are marked *