All Products
Search
Document Center

Elastic Compute Service:Manage ECS instances in Python without logging on to the instances

最終更新日:Sep 06, 2023

You can use Cloud Assistant to simultaneously run a command on multiple Elastic Compute Service (ECS) instances. The command can be a shell, batch, or PowerShell command. You can use SSH or Remote Desktop Protocol (RDP) to log on to ECS instances and perform O&M operations. However, you can use Cloud Assistant to perform O&M operations on ECS instances without logging on to the instances. This topic describes how to use Cloud Assistant to manage ECS instances in a Python development environment.

Prerequisites

  • Cloud Assistant Agent is installed on the ECS instances that you want to manage. For information about how to install Cloud Assistant Agent, see Install Cloud Assistant Agent.

  • The aliyun-python-sdk-ecs SDK dependency in Python is updated to V2.1.2 or later. For more information about the latest version of ECS SDK for Python, visit GitHub Repo Alibaba Cloud.

Procedure

  1. Compile the shell, batch, or PowerShell command based on the instance configurations and the operations that you want to perform.

  2. Find the instances that meet the specified requirements.

    The instances must be in the Running (Running) state. For more information about how to use ECS SDK for Python to query instances, see Query an ECS instance.

  3. Obtain the AccessKey pair of your account and query the region ID.

    For more information, see Regions and zones and Create an AccessKey pair.

  4. Run a Cloud Assistant command on one or more ECS instances.

    Sample code:

    # coding=utf-8
    # If the Python sdk is not installed, run 'sudo pip install aliyun-python-sdk-ecs'.
    # Make sure you're using the latest sdk version.
    # Run 'sudo pip install --upgrade aliyun-python-sdk-ecs' to upgrade.
    
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ClientException
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkecs.request.v20140526.RunCommandRequest import RunCommandRequest
    from aliyunsdkecs.request.v20140526.DescribeInvocationResultsRequest import DescribeInvocationResultsRequest
    import json
    import sys
    import base64
    import time
    import logging
    import os
    
    # Configure the log output formatter
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(name)s [%(levelname)s]: %(message)s",
                        datefmt='%m-%d %H:%M')
    
    logger = logging.getLogger()
    
    # Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured in the code runtime. 
    # If the project code is leaked, the AccessKey pair may be leaked and the security of resources within your account may be compromised. The following sample code shows how to use environment variables to obtain an AccessKey pair and use the AccessKey pair to call API operations. The sample code is for reference only. We recommend that you use Security Token Service (STS) tokens, which provide higher security.
    # Replace <RegionId> with the actual region ID. You can call the DescribeRegions operation to query the most recent region list. 
    access_key = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']             
    access_key_secret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']  
    region_id = '<RegionId>'                  
    
    
    
    
    client = AcsClient(access_key, access_key_secret, region_id)
    
    def base64_decode(content, code='utf-8'):
        if sys.version_info.major == 2:
            return base64.b64decode(content)
        else:
            return base64.b64decode(content).decode(code)
    
    
    def get_invoke_result(invoke_id):
        request = DescribeInvocationResultsRequest()
        request.set_accept_format('json')
    
        request.set_InvokeId(invoke_id)
        response = client.do_action_with_exception(request)
        response_detail = json.loads(response)["Invocation"]["InvocationResults"]["InvocationResult"][0]
        status = response_detail.get("InvocationStatus","")
        output = base64_decode(response_detail.get("Output",""))
        return status,output
    
    
    def run_command(cmdtype,cmdcontent,instance_id,timeout=60):
        """
        cmdtype: the type of the command. Valid values: RunBatScript, RunPowerShellScript, and RunShellScript.
        cmdcontent: the content of the command.
        instance_id: the ID of the instance.
        """
        try:
            request = RunCommandRequest()
            request.set_accept_format('json')
    
            request.set_Type(cmdtype)
            request.set_CommandContent(cmdcontent)
            request.set_InstanceIds([instance_id])
            # Specify the execution timeout period of the command. Unit: seconds. Default value: 60. Specify this parameter based on the actual command.
            request.set_Timeout(timeout)
            response = client.do_action_with_exception(request)
            invoke_id = json.loads(response).get("InvokeId")
            return invoke_id
        except Exception as e:
            logger.error("run command failed")
    
    
    
    def wait_invoke_finished_get_out(invoke_id,wait_count,wait_interval):
        for i in range(wait_count):
            status,output = get_invoke_result(invoke_id)
            if status not in ["Running","Pending","Stopping"]:
                return status,output
            time.sleep(wait_interval)
    
        logger.error("after wait %d times, still can not wait invoke-id %s finished")
        return "",""
    
    
    
    def run_task():
        # Specify the type of the Cloud Assistant command.
        cmdtype = "RunShellScript"
        # Specify the content of the Cloud Assistant command.
        cmdcontent = """
        #!/bin/bash
        yum check-update
        """
        # Specify the timeout period.
        timeout = 60
        # Specify the ID of your instance.
        ins_id = "i-wz9bsqk9pa0d2oge****" 
        # Run the command.
        invoke_id = run_command(cmdtype,cmdcontent,ins_id,timeout)
        logger.info("run command,invoke-id:%s" % invoke_id)
    
        # Wait for the command to be run. Query the execution status of the command 10 times at an interval of 5 seconds. You can also specify the number of times the command execution status is queried and the query interval based on your business requirements.
        status,output = wait_invoke_finished_get_out(invoke_id,10,5)
        if status:
            logger.info("invoke-id execute finished,status: %s,output:%s" %(status,output))
    
    
    if __name__ == '__main__':
        run_task()