吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2111|回复: 8
收起左侧

[Python 原创] 【原创源码】【Python】批量购买某云服务器,并且执行相关代码

[复制链接]
qiujw 发表于 2021-11-12 10:09
批量创建某云服务,并且执行相关代码,运维的可以看一下,已经封装好相关代码,解放双手

安装相关依赖
pip install aliyun-python-sdk-ecs
pip install --upgrade aliyun-python-sdk-ecs

[Python] 纯文本查看 复制代码
#  coding=utf-8
# if the python sdk is not install using 'sudo pip install aliyun-python-sdk-ecs'
# if the python sdk is install using 'sudo pip install --upgrade aliyun-python-sdk-ecs'
# make sure the sdk version is 4.4.3, you can use command 'pip show aliyun-python-sdk-ecs' to check

import json
import logging
import os
import time
from datetime import datetime, timedelta

from aliyunsdkcore import client
from aliyunsdkecs.endpoint import EndpointData
from aliyunsdkecs.request.v20140526.DeleteInstanceRequest import DeleteInstanceRequest
from aliyunsdkecs.request.v20140526.DeleteInstancesRequest import DeleteInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeRegionsRequest import DescribeRegionsRequest
from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
from aliyunsdkecs.request.v20140526.SendFileRequest import SendFileRequest
from aliyunsdkecs.request.v20140526.RunCommandRequest import RunCommandRequest
from aliyunsdkecs.request.v20140526.DescribeAccountAttributesRequest import DescribeAccountAttributesRequest
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S')


ecs_pwd="abcd123."

# 文档:https://partners-intl.aliyun.com/help/doc-detail/25506.htm?spm=a2c63.p38356.b99.674.569f59abJyS8k4
# 管理ECS(创建,执行脚本等)
class EcsInstance():

    def __init__(self,
                 ak_id,
                 ak_secret,
                 # 4c 8g
                 instance_type="ecs.c6e.xlarge",
                 vswitch_id=None,
                 image_id=None,
                 # 当前VPC类型的安全组
                 security_group_id= None,
                 region_id = None, #"cn-hongkong",
                 zone_id = None, #"cn-hongkong-b",
                 endpoint = None, #"ecs-cn-hangzhou.aliyuncs.com"
                 system_disk_type="cloud_essd",
                 ecs_password=ecs_pwd,
                 target_dir="/tmp",
                 auto_release_time = None
                 ):
        print("#######New EcsInstance()#######")
        # 设置地域。如不知道是哪个,可以调用查询所有地域方法 regions_request()
        self.region_id = region_id
        self.zone_id = zone_id

        if region_id is None:
            print("region Id can not be empty!")

        if endpoint is None:
            region_endpoint = EndpointData().getEndpointMap().get(region_id)
            if region_endpoint is None:
                region_endpoint = "ecs-cn-hangzhou.aliyuncs.com"
            self.endpoint = region_endpoint
        else:
            self.endpoint = endpoint

        if not ak_id or len(ak_id) <= 0:
            print("access key Id can not be empty!")

        if not ak_secret or len(ak_secret) <= 0:
            print("access key secret can not be empty!")

        self.clt = client.AcsClient(ak_id, ak_secret, self.region_id)

        # 设置实例规格。
        # 4c 8g
        # instance_type = "ecs.c6e.xlarge"
        #4c 16g
        # instance_type = "ecs.g6e.xlarge"
        self.instance_type = instance_type
        # 选择的交换机
        self.vswitch_id = vswitch_id
        # 使用的镜像信息。
        self.image_id = image_id
        # 当前VPC类型的安全组。
        self.security_group_id = security_group_id
        # 批量创建ECS实例的数量, 取值范围:1-100, 默认值:1。
        self.amount = 1
        # 自动释放时间。使用UTC时间,格式为 yyyy-MM-ddTHH:mm:ssZ。最短释放时间为当前时间半小时之后;最长释放时间不能超过当前时间三年。
        # 最小释放时间不能小于31分钟
        if not auto_release_time:
            self.auto_release_time = self.local2Utc(31)#"2021-09-26T18:20Z"
        else:
            self.auto_release_time = auto_release_time

        self.system_disk_type = system_disk_type
        self.ecs_password = ecs_password
        self.target_dir = target_dir

    # @property
    def ecs_password(self):
        return self.ecs_password

    # 自动释放时间。使用UTC时间
    def local2Utc(self, minutes):
        currentTime = datetime.now() + timedelta(minutes=minutes)
        # 2021-08-25T16:28Z
        str = datetime.utcfromtimestamp(currentTime.timestamp()).strftime('%Y-%m-%dT%H:%M:00Z')
        return str
        # return currentTime.strftime('%Y-%m-%dT%H:%M:00Z')

    # 创建自定义ECS实例并启动,返回ip列表
    def create_instances(self, amount=None, auto_release_time_mins=None):
        running_instances_list = self.create_custom_multiple_instances(amount, auto_release_time_mins)
        return self.convert_instance_ip_list(running_instances_list)

    # https://partners-intl.aliyun.com/help/doc-detail/25506.htm?spm=a2c63.p38356.b99.674.569f59abJyS8k4
    # 创建自定义ECS实例并启动。 最好不要超过10台
    def create_custom_multiple_instances(self, amount=None, auto_release_time_mins=31):
        request = self.build_instances_request()
        if amount and amount > 0:
            request.set_Amount(amount)
        else:
            request.set_Amount(self.amount)

        if amount and amount > 10:
            print("The maximum number of amount Can't be greater than 10!")
            return
        # 表示按使用流量付费
        request.set_InternetChargeType("PayByTraffic")
        # 分配公网IP, 取带宽的上限设置,计费以实际使用的网络流量为依据
        request.set_InternetMaxBandwidthOut(100)
        # 设置自动释放时间
        if auto_release_time_mins:
            if auto_release_time_mins < 31:
                print("最小释放时间不能小于31分钟")
            request.set_AutoReleaseTime(self.local2Utc(auto_release_time_mins))
        else:
            request.set_AutoReleaseTime(self.auto_release_time)
        return self._execute_request(request)

    # 创建自定义ECS实例并启动。
    def create_single_instances(self):
        return self.create_custom_multiple_instances()

    # 创建ECS实例并启动。
    def create_multiple_instances(self):
        request = self.build_instances_request()
        request.set_Amount(self.amount)
        self._execute_request(request)

    # 创建ECS实例并分配公网IP。
    def create_multiple_instances_with_public_ip(self):
        request = self.build_instances_request()
        request.set_Amount(self.amount)
        request.set_InternetMaxBandwidthOut(1)
        self._execute_request(request)

    # 创建ECS实例并设置自动释放时间。
    def create_multiple_instances_with_auto_release_time(self):
        request = self.build_instances_request()
        request.set_Amount(self.amount)
        request.set_AutoReleaseTime(self.auto_release_time)
        self._execute_request(request)

    def _execute_request(self, request):
        response = self._send_request(request)
        running_instances_list = []
        instance_ids = []
        if response.get('Code') is None:
            instance_ids = response.get('InstanceIdSets').get('InstanceIdSet')
            running_amount = 0
            amount = request.get_Amount()
            while running_amount < amount:
                time.sleep(10)
                # 最好不要超过10台,因为分页里最多拿10台
                running_instances_list = self.get_running_instances(instance_ids)
                running_amount = len(running_instances_list)
                print("ecs instance running_amount:%s, amount:%s" % (running_amount, amount))
        else:
            print("create ECS failure:", response)
        print("ecs instance %s is running" % instance_ids)
        return running_instances_list

    def check_instance_running(self, instance_ids, max_results=10):
        request = DescribeInstancesRequest()
        request.set_InstanceIds(json.dumps(instance_ids))
        request.set_MaxResults(max_results)
        response = self._send_request(request)
        if response.get('Code') is None:
            instances_list = response.get('Instances').get('Instance')
            running_count = 0
            for instance_detail in instances_list:
                if instance_detail.get('Status') == "Running":
                    running_count += 1
            return running_count

    #获取运行中的实例ecs
    def get_running_instances(self, instance_ids, max_results=10):
        request = DescribeInstancesRequest()
        request.set_MaxResults(max_results)
        request.set_InstanceIds(json.dumps(instance_ids))
        response = self._send_request(request)
        running_instances_list = []
        if response.get('Code') is None:
            instances_list = response.get('Instances').get('Instance')
            for instance_detail in instances_list:
                if instance_detail.get('Status') == "Running":
                    running_instances_list.append(instance_detail)
        return running_instances_list

    def build_instances_request(self):
        request = RunInstancesRequest()
        request.set_ImageId(self.image_id)
        request.set_VSwitchId(self.vswitch_id)
        request.set_SecurityGroupId(self.security_group_id)
        intance_name = "Instance-Name-%s" % (datetime.now().strftime('%H%M'))
        request.set_InstanceName(intance_name)
        request.set_InstanceType(self.instance_type)
        request.set_SystemDiskCategory(self.system_disk_type)
        request.set_SystemDiskSize(20)
        request.set_Password(self.ecs_password)
        if self.zone_id:
            request.set_ZoneId(self.zone_id)
        # request.set_endpoint(self.endpoint)
        return request

    # 发送API请求
    def _send_request(self, request):
        request.set_accept_format('json')
        try:
            response_str = self.clt.do_action(request)
            # logging.info(response_str)
            response_detail = json.loads(response_str)
            return response_detail
        except Exception as e:
            logging.error(e)

    def send_file_instances(self, instance_ids, file_name, path=None, fileMode="0755", retryTimes=5, sleepSeconds=3):
        if not instance_ids and len(instance_ids) <= 0:
            print("ECS instance ids is Blank!")
            return False
        if not path:
            path = os.getcwd()
        file_path = os.path.join(path, file_name)
        print("send file: " + file_path)
        if not os.path.exists(file_path):
            print("No such file:" + file_path)
            raise FileNotFoundError("No such file: " + file_path)

        contentStr = ""
        with open(file_path, mode='r', encoding='utf-8') as f:
            for line in f:
                contentStr += line
        if not contentStr:
            print("file[%s] content is empty!" % file_path)
            return False

        sleep_send_file_seconds = 5
        print("sleep %s, start to send [%s] success:" % (sleep_send_file_seconds, file_path))
        self.send_file_request(instance_ids, contentStr, file_name, fileMode, retryTimes, sleepSeconds)
        time.sleep(sleep_send_file_seconds)

    def send_file_request(self, instance_ids, contentStr, file_name="proxyReq.py", fileMode="0755", retryTimes=5, sleepSeconds=3):
        request = self._build_send_file_request(instance_ids, contentStr, file_name, fileMode)
        response = self._send_request(request)
        if response.get('Code') is None:
            print("send file[%s] success: " % file_name, response)
        else:
            print("send file[%s] failure: " % file_name, response)
            retryTimes = retryTimes - 1
            if retryTimes < 0:
                return None
            sleepSeconds *= 2
            print("sleep %ss to send file[%s], max retry times:%s!" % (sleepSeconds, file_name, retryTimes))
            time.sleep(sleepSeconds)
            self.send_file_request(instance_ids, contentStr, file_name, fileMode, retryTimes, sleepSeconds)

    def _build_send_file_request(self, instance_ids, content, file_name, fileMode):
        request = SendFileRequest()
        request.set_connect_timeout(60)#超时、秒
        request.set_Timeout(60)
        request.set_read_timeout(60)
        request.set_InstanceIds(instance_ids)
        request.set_ContentType("PlainText")
        request.set_Content(content)
        request.set_Name(file_name)
        request.set_FileMode(fileMode)#0755 可执行 即用户具有读/写/执行权限,组用户和其它用户具有读写权限
        request.set_TargetDir(self.target_dir)
        # request.set_endpoint(self.endpoint)
        request.set_Overwrite(True)
        return request

    def run_command_request(self, instance_ids, run_content, work_dir="/tmp", retryTimes=3, sleepSeconds=3):
        request = self._build_run_command_request(instance_ids, run_content, work_dir)
        response = self._send_request(request)
        if response.get('Code') is None:
            print("run command success! instances:%s  response:%s" % (instance_ids, response))
        else:
            print("run command failure! instances:%s  response:%s" % (instance_ids, response))
            retryTimes = retryTimes - 1
            if retryTimes < 0:
                return None
            sleepSeconds *= 3
            print("sleep %ss to run command, max retry times:%s!" % (sleepSeconds, retryTimes))
            time.sleep(sleepSeconds)
            self.run_command_request(instance_ids, run_content, work_dir, retryTimes, sleepSeconds)

    def _build_run_command_request(self, instance_ids, run_content, work_dir):
        request = RunCommandRequest()
        request.set_connect_timeout(30)  # 超时、秒
        request.set_InstanceIds(instance_ids)
        request.set_Name("run_python")
        # request.set_endpoint(self.endpoint)
        request.set_Type("RunShellScript")
        request.set_ContentEncoding("PlainText")
        request.set_CommandContent(run_content)
        request.set_WorkingDir(work_dir)
        return request

    # 在一个地域下,根据您的使用情况,最多可以保有100-10000条云助手命令,
    # 每天最多可以执行2000-200000条云助手命令。您可以通过DescribeAccountAttribute查询配额情况,
    # 也可以提交工单调整保有量配额和调用次数配额。
    # 目前max-axt-command-count 为 500
    def acc_attr_request(self):
        request = self._build_acc_attr_request()
        response = self._send_request(request)
        if response.get('Code') is None:
            print(response)
        else:
            print(response)
    def _build_acc_attr_request(self):
        request = DescribeAccountAttributesRequest()
        request.set_connect_timeout(30)#超时、秒
        # request.set_InstanceIds(['i-xxx'])
        # request.set_endpoint(self.endpoint)
        if self.zone_id:
            request.set_ZoneId(self.zone_id)
        return request

    # 查询所有地域
    def regions_request(self):
        request = self._build_regions_request()
        request.set_accept_format('json')
        response = self._send_request(request)
        if response.get('Code') is None:
            print(response)
        else:
            print(response)
    def _build_regions_request(self):
        request = DescribeRegionsRequest()
        request.set_connect_timeout(30)#超时、秒
        # 查询类型instance、disk
        request.set_ResourceType("instance")
        request.set_AcceptLanguage("zh-CN")
        return request

    # 删除实例(慎用)
    def del_instance_request(self, instance_id, force=False):
        request = self._build_del_instance_request()
        request.set_accept_format('json')
        request.set_InstanceId(instance_id)
        # force 为True, 则所有状态(包括Running) 也删除 慎用
        request.set_Force(force)
        response = self._send_request(request)
        if response.get('Code') is None:
            print("del instance success: ", response)
        else:
            print("del instance failure: ", response)
    def _build_del_instance_request(self):
        request = DeleteInstanceRequest()
        request.set_connect_timeout(30)  # 超时、秒
        return request

    # 删除实例(慎用)
    def del_instance_request(self, instance_ids, force=False):
        request = self._build_del_instances_request()
        request.set_accept_format('json')
        request.set_InstanceIds(instance_ids)
        # force 为True, 则所有状态(包括Running) 也删除 慎用
        request.set_Force(force)
        response = self._send_request(request)
        if response.get('Code') is None:
            print(response)
        else:
            print(response)
    def _build_del_instances_request(self):
        request = DeleteInstancesRequest()
        request.set_connect_timeout(30)  # 超时、秒
        return request

    # 删除实例(慎用)TODO
    def del_all_instances_by_query_request(self,
                                       instance_ids=None,
                                       instance_name=None,
                                       status="Stopped", # Pending Running Starting Stopping Stopped
                                       internet_charge_type="PayByTraffic",
                                       force=False,
                                       not_del_instance_ids=None):
        instances_list = self.list_instance_request(instance_ids, instance_name, status, internet_charge_type)
        while(instances_list and len(instances_list) > 0):
            instance_id_list = []
            sleep_del_all_instance = 10
            if instances_list:
                for instance in instances_list:
                    instance_id = instance.get("InstanceId")
                    if not_del_instance_ids and instance_id in not_del_instance_ids:
                        continue
                    instance_id_list.append(instance_id)
                    if len(instance_id_list) > 20:
                        self.del_instance_request(instance_id_list, force)
                        print("sleep %ss, start to del instance ids:%s" % (sleep_del_all_instance, instance_id_list))
                        time.sleep(sleep_del_all_instance)
                        instance_id_list = []
            if len(instance_id_list) > 0:
                self.del_instance_request(instance_id_list, force)
                print("sleep %ss, start to del instance ids:%s" % (sleep_del_all_instance, instance_id_list))
                time.sleep(sleep_del_all_instance)
                pass
            else:
                break
            instances_list = self.list_instance_request(instance_ids, instance_name, status, internet_charge_type)

    # 查询所有实例ECS
    def list_instance_request(self,
                              instance_ids=None,
                              instance_name=None,
                              status=None, # Pending Running Starting Stopping Stopped
                              internet_charge_type="PayByTraffic",
                              max_results=10):
        request = self._build_list_instance_request(instance_ids, instance_name, status, internet_charge_type, max_results)
        request.set_accept_format('json')
        response = self._send_request(request)
        if response.get('Code') is None:
            instances_list = response.get('Instances').get('Instance')
            return instances_list
        else:
            print(response)
    def _build_list_instance_request(self,
                                     instance_ids,
                                     instance_name,
                                     status, # Pending Running Starting Stopping Stopped
                                     internet_charge_type,
                                     max_results):
        request = DescribeInstancesRequest()
        request.set_MaxResults(max_results)
        request.set_connect_timeout(30)  # 超时、秒
        if instance_ids and len(instance_ids) > 0:
            request.set_InstanceIds(instance_ids)
        if instance_name:
            request.set_InstanceName(instance_name)
        if status:
            # Pending Running Starting Stopping Stopped
            request.set_Status(status)
        request.set_VSwitchId(self.vswitch_id)
        request.set_SecurityGroupId(self.security_group_id)
        request.set_InternetChargeType(internet_charge_type)
        if self.zone_id:
            request.set_ZoneId(self.zone_id)
        # request.set_endpoint(self.endpoint)
        return request


    # 创建自定义ECS实例并启动,返回ids列表
    def convert_instance_ids(self, instance_list):
        instance_ids = []
        if instance_list:
            for instance in instance_list:
                instance_ids.append(instance.get("InstanceId"))
        return instance_ids

    # 创建自定义ECS实例并启动,返回ips列表
    def convert_instance_ip_list(self, instance_list):
        instance_ip_list = []
        if instance_list:
            for instance in instance_list:
                instance_ip_list.append(instance.get("PublicIpAddress").get("IpAddress")[0])
        return instance_ip_list

    # 后台运行py脚本 nohup
    def run_py_nohup(self, instance_ids, run_py_file):
        shell = "nohup python -u %s > %s.log 2>&1 &" % (run_py_file, run_py_file)
        self.run_shell(instance_ids, shell)

    def run_shell(self, instance_ids, shell):
        self.run_command_request(instance_ids, shell)

def test_run():
    print("hello ecs batch create instance")
    # 您的access key secret。
    # ak_id = "xx"
    # ak_secret = "xxx"
    # region_id = "cn-hongkong"
    # ecs = EcsInstance(ak_id, ak_secret, region_id=region_id)
    # print(ecs.create_instances(1))

    # ecs.regions_request()

    # run_py_file = "proxyReq.py"
    # run_py_file_path = os.path.join(os.getcwd(), run_py_file)
    # if not os.path.exists(run_py_file_path):
    #     raise FileNotFoundError("No such file: " + run_py_file_path)
    #
    # contentStr = ""
    # with open(run_py_file_path, mode='r', encoding='utf-8') as f:
    #     for line in f:
    #         contentStr += line
    # if not contentStr:
    #     print("读取文件内容不能为空")
    # instance_ids = ['i-xxxx']
    # ecs.send_file_request(instance_ids, contentStr)
    # ecs.run_command_request(instance_ids, "nohup python -u t.py > out.log 2>&1 &")

    # ecs.acc_attr_request()
    # print(ecs.ecs_password)
    # 创建自定义ECS实例并启动。
    # create_custom_multiple_instances()

    # 创建ECS实例并启动。
    # create_multiple_instances()
    # 创建绑定公网IP的ECS实例。
    # create_multiple_instances_with_public_ip()
    # 创建ECS实例并设置自动释放时间。
    # create_multiple_instances_with_auto_release_time()
if __name__ == '__main__':
    # test_run()
    pass

免费评分

参与人数 1吾爱币 +7 热心值 +1 收起 理由
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

chinalys 发表于 2021-11-12 11:08
谢谢分享,学习了。
207xb 发表于 2021-11-12 11:13
本帖最后由 207xb 于 2021-11-12 11:19 编辑

感谢分享 学习到了
尘缘丶 发表于 2021-11-12 11:22
 楼主| qiujw 发表于 2021-11-12 11:27
尘缘丶 发表于 2021-11-12 11:22
用这软件购买成功的一定都是有钱人吧

批量购买10台,设置定时释放,用半小时钟,跑完脚本,也就几块钱而已
飘零的殇 发表于 2021-11-12 11:33
这是啥活动,最终能得到啥
 楼主| qiujw 发表于 2021-11-12 11:37
飘零的殇 发表于 2021-11-12 11:33
这是啥活动,最终能得到啥

适合那些需要临时购买N台服务跑一些任务,比如爬虫,或者抢购脚本,然后自动释放,一键购买,并且执行你指定的脚本
飘零的殇 发表于 2021-11-12 13:58
qiujw 发表于 2021-11-12 11:37
适合那些需要临时购买N台服务跑一些任务,比如爬虫,或者抢购脚本,然后自动释放,一键购买,并且执行你 ...

大佬还是牛批
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-13 13:15

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表