[Python] 纯文本查看 复制代码
# coding=utf-8
# if the python sdk is not install using 'sudo pip install aliyun-python-sdk-ecs'
# if the python sdk is install using 'sudo pip install --upgrade aliyun-python-sdk-ecs'
# make sure the sdk version is 4.4.3, you can use command 'pip show aliyun-python-sdk-ecs' to check
import json
import logging
import os
import time
from datetime import datetime, timedelta
from aliyunsdkcore import client
from aliyunsdkecs.endpoint import EndpointData
from aliyunsdkecs.request.v20140526.DeleteInstanceRequest import DeleteInstanceRequest
from aliyunsdkecs.request.v20140526.DeleteInstancesRequest import DeleteInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeRegionsRequest import DescribeRegionsRequest
from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
from aliyunsdkecs.request.v20140526.SendFileRequest import SendFileRequest
from aliyunsdkecs.request.v20140526.RunCommandRequest import RunCommandRequest
from aliyunsdkecs.request.v20140526.DescribeAccountAttributesRequest import DescribeAccountAttributesRequest
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S')
ecs_pwd="abcd123."
# 文档:https://partners-intl.aliyun.com/help/doc-detail/25506.htm?spm=a2c63.p38356.b99.674.569f59abJyS8k4
# 管理ECS(创建,执行脚本等)
class EcsInstance():
def __init__(self,
ak_id,
ak_secret,
# 4c 8g
instance_type="ecs.c6e.xlarge",
vswitch_id=None,
image_id=None,
# 当前VPC类型的安全组
security_group_id= None,
region_id = None, #"cn-hongkong",
zone_id = None, #"cn-hongkong-b",
endpoint = None, #"ecs-cn-hangzhou.aliyuncs.com"
system_disk_type="cloud_essd",
ecs_password=ecs_pwd,
target_dir="/tmp",
auto_release_time = None
):
print("#######New EcsInstance()#######")
# 设置地域。如不知道是哪个,可以调用查询所有地域方法 regions_request()
self.region_id = region_id
self.zone_id = zone_id
if region_id is None:
print("region Id can not be empty!")
if endpoint is None:
region_endpoint = EndpointData().getEndpointMap().get(region_id)
if region_endpoint is None:
region_endpoint = "ecs-cn-hangzhou.aliyuncs.com"
self.endpoint = region_endpoint
else:
self.endpoint = endpoint
if not ak_id or len(ak_id) <= 0:
print("access key Id can not be empty!")
if not ak_secret or len(ak_secret) <= 0:
print("access key secret can not be empty!")
self.clt = client.AcsClient(ak_id, ak_secret, self.region_id)
# 设置实例规格。
# 4c 8g
# instance_type = "ecs.c6e.xlarge"
#4c 16g
# instance_type = "ecs.g6e.xlarge"
self.instance_type = instance_type
# 选择的交换机
self.vswitch_id = vswitch_id
# 使用的镜像信息。
self.image_id = image_id
# 当前VPC类型的安全组。
self.security_group_id = security_group_id
# 批量创建ECS实例的数量, 取值范围:1-100, 默认值:1。
self.amount = 1
# 自动释放时间。使用UTC时间,格式为 yyyy-MM-ddTHH:mm:ssZ。最短释放时间为当前时间半小时之后;最长释放时间不能超过当前时间三年。
# 最小释放时间不能小于31分钟
if not auto_release_time:
self.auto_release_time = self.local2Utc(31)#"2021-09-26T18:20Z"
else:
self.auto_release_time = auto_release_time
self.system_disk_type = system_disk_type
self.ecs_password = ecs_password
self.target_dir = target_dir
# @property
def ecs_password(self):
return self.ecs_password
# 自动释放时间。使用UTC时间
def local2Utc(self, minutes):
currentTime = datetime.now() + timedelta(minutes=minutes)
# 2021-08-25T16:28Z
str = datetime.utcfromtimestamp(currentTime.timestamp()).strftime('%Y-%m-%dT%H:%M:00Z')
return str
# return currentTime.strftime('%Y-%m-%dT%H:%M:00Z')
# 创建自定义ECS实例并启动,返回ip列表
def create_instances(self, amount=None, auto_release_time_mins=None):
running_instances_list = self.create_custom_multiple_instances(amount, auto_release_time_mins)
return self.convert_instance_ip_list(running_instances_list)
# https://partners-intl.aliyun.com/help/doc-detail/25506.htm?spm=a2c63.p38356.b99.674.569f59abJyS8k4
# 创建自定义ECS实例并启动。 最好不要超过10台
def create_custom_multiple_instances(self, amount=None, auto_release_time_mins=31):
request = self.build_instances_request()
if amount and amount > 0:
request.set_Amount(amount)
else:
request.set_Amount(self.amount)
if amount and amount > 10:
print("The maximum number of amount Can't be greater than 10!")
return
# 表示按使用流量付费
request.set_InternetChargeType("PayByTraffic")
# 分配公网IP, 取带宽的上限设置,计费以实际使用的网络流量为依据
request.set_InternetMaxBandwidthOut(100)
# 设置自动释放时间
if auto_release_time_mins:
if auto_release_time_mins < 31:
print("最小释放时间不能小于31分钟")
request.set_AutoReleaseTime(self.local2Utc(auto_release_time_mins))
else:
request.set_AutoReleaseTime(self.auto_release_time)
return self._execute_request(request)
# 创建自定义ECS实例并启动。
def create_single_instances(self):
return self.create_custom_multiple_instances()
# 创建ECS实例并启动。
def create_multiple_instances(self):
request = self.build_instances_request()
request.set_Amount(self.amount)
self._execute_request(request)
# 创建ECS实例并分配公网IP。
def create_multiple_instances_with_public_ip(self):
request = self.build_instances_request()
request.set_Amount(self.amount)
request.set_InternetMaxBandwidthOut(1)
self._execute_request(request)
# 创建ECS实例并设置自动释放时间。
def create_multiple_instances_with_auto_release_time(self):
request = self.build_instances_request()
request.set_Amount(self.amount)
request.set_AutoReleaseTime(self.auto_release_time)
self._execute_request(request)
def _execute_request(self, request):
response = self._send_request(request)
running_instances_list = []
instance_ids = []
if response.get('Code') is None:
instance_ids = response.get('InstanceIdSets').get('InstanceIdSet')
running_amount = 0
amount = request.get_Amount()
while running_amount < amount:
time.sleep(10)
# 最好不要超过10台,因为分页里最多拿10台
running_instances_list = self.get_running_instances(instance_ids)
running_amount = len(running_instances_list)
print("ecs instance running_amount:%s, amount:%s" % (running_amount, amount))
else:
print("create ECS failure:", response)
print("ecs instance %s is running" % instance_ids)
return running_instances_list
def check_instance_running(self, instance_ids, max_results=10):
request = DescribeInstancesRequest()
request.set_InstanceIds(json.dumps(instance_ids))
request.set_MaxResults(max_results)
response = self._send_request(request)
if response.get('Code') is None:
instances_list = response.get('Instances').get('Instance')
running_count = 0
for instance_detail in instances_list:
if instance_detail.get('Status') == "Running":
running_count += 1
return running_count
#获取运行中的实例ecs
def get_running_instances(self, instance_ids, max_results=10):
request = DescribeInstancesRequest()
request.set_MaxResults(max_results)
request.set_InstanceIds(json.dumps(instance_ids))
response = self._send_request(request)
running_instances_list = []
if response.get('Code') is None:
instances_list = response.get('Instances').get('Instance')
for instance_detail in instances_list:
if instance_detail.get('Status') == "Running":
running_instances_list.append(instance_detail)
return running_instances_list
def build_instances_request(self):
request = RunInstancesRequest()
request.set_ImageId(self.image_id)
request.set_VSwitchId(self.vswitch_id)
request.set_SecurityGroupId(self.security_group_id)
intance_name = "Instance-Name-%s" % (datetime.now().strftime('%H%M'))
request.set_InstanceName(intance_name)
request.set_InstanceType(self.instance_type)
request.set_SystemDiskCategory(self.system_disk_type)
request.set_SystemDiskSize(20)
request.set_Password(self.ecs_password)
if self.zone_id:
request.set_ZoneId(self.zone_id)
# request.set_endpoint(self.endpoint)
return request
# 发送API请求
def _send_request(self, request):
request.set_accept_format('json')
try:
response_str = self.clt.do_action(request)
# logging.info(response_str)
response_detail = json.loads(response_str)
return response_detail
except Exception as e:
logging.error(e)
def send_file_instances(self, instance_ids, file_name, path=None, fileMode="0755", retryTimes=5, sleepSeconds=3):
if not instance_ids and len(instance_ids) <= 0:
print("ECS instance ids is Blank!")
return False
if not path:
path = os.getcwd()
file_path = os.path.join(path, file_name)
print("send file: " + file_path)
if not os.path.exists(file_path):
print("No such file:" + file_path)
raise FileNotFoundError("No such file: " + file_path)
contentStr = ""
with open(file_path, mode='r', encoding='utf-8') as f:
for line in f:
contentStr += line
if not contentStr:
print("file[%s] content is empty!" % file_path)
return False
sleep_send_file_seconds = 5
print("sleep %s, start to send [%s] success:" % (sleep_send_file_seconds, file_path))
self.send_file_request(instance_ids, contentStr, file_name, fileMode, retryTimes, sleepSeconds)
time.sleep(sleep_send_file_seconds)
def send_file_request(self, instance_ids, contentStr, file_name="proxyReq.py", fileMode="0755", retryTimes=5, sleepSeconds=3):
request = self._build_send_file_request(instance_ids, contentStr, file_name, fileMode)
response = self._send_request(request)
if response.get('Code') is None:
print("send file[%s] success: " % file_name, response)
else:
print("send file[%s] failure: " % file_name, response)
retryTimes = retryTimes - 1
if retryTimes < 0:
return None
sleepSeconds *= 2
print("sleep %ss to send file[%s], max retry times:%s!" % (sleepSeconds, file_name, retryTimes))
time.sleep(sleepSeconds)
self.send_file_request(instance_ids, contentStr, file_name, fileMode, retryTimes, sleepSeconds)
def _build_send_file_request(self, instance_ids, content, file_name, fileMode):
request = SendFileRequest()
request.set_connect_timeout(60)#超时、秒
request.set_Timeout(60)
request.set_read_timeout(60)
request.set_InstanceIds(instance_ids)
request.set_ContentType("PlainText")
request.set_Content(content)
request.set_Name(file_name)
request.set_FileMode(fileMode)#0755 可执行 即用户具有读/写/执行权限,组用户和其它用户具有读写权限
request.set_TargetDir(self.target_dir)
# request.set_endpoint(self.endpoint)
request.set_Overwrite(True)
return request
def run_command_request(self, instance_ids, run_content, work_dir="/tmp", retryTimes=3, sleepSeconds=3):
request = self._build_run_command_request(instance_ids, run_content, work_dir)
response = self._send_request(request)
if response.get('Code') is None:
print("run command success! instances:%s response:%s" % (instance_ids, response))
else:
print("run command failure! instances:%s response:%s" % (instance_ids, response))
retryTimes = retryTimes - 1
if retryTimes < 0:
return None
sleepSeconds *= 3
print("sleep %ss to run command, max retry times:%s!" % (sleepSeconds, retryTimes))
time.sleep(sleepSeconds)
self.run_command_request(instance_ids, run_content, work_dir, retryTimes, sleepSeconds)
def _build_run_command_request(self, instance_ids, run_content, work_dir):
request = RunCommandRequest()
request.set_connect_timeout(30) # 超时、秒
request.set_InstanceIds(instance_ids)
request.set_Name("run_python")
# request.set_endpoint(self.endpoint)
request.set_Type("RunShellScript")
request.set_ContentEncoding("PlainText")
request.set_CommandContent(run_content)
request.set_WorkingDir(work_dir)
return request
# 在一个地域下,根据您的使用情况,最多可以保有100-10000条云助手命令,
# 每天最多可以执行2000-200000条云助手命令。您可以通过DescribeAccountAttribute查询配额情况,
# 也可以提交工单调整保有量配额和调用次数配额。
# 目前max-axt-command-count 为 500
def acc_attr_request(self):
request = self._build_acc_attr_request()
response = self._send_request(request)
if response.get('Code') is None:
print(response)
else:
print(response)
def _build_acc_attr_request(self):
request = DescribeAccountAttributesRequest()
request.set_connect_timeout(30)#超时、秒
# request.set_InstanceIds(['i-xxx'])
# request.set_endpoint(self.endpoint)
if self.zone_id:
request.set_ZoneId(self.zone_id)
return request
# 查询所有地域
def regions_request(self):
request = self._build_regions_request()
request.set_accept_format('json')
response = self._send_request(request)
if response.get('Code') is None:
print(response)
else:
print(response)
def _build_regions_request(self):
request = DescribeRegionsRequest()
request.set_connect_timeout(30)#超时、秒
# 查询类型instance、disk
request.set_ResourceType("instance")
request.set_AcceptLanguage("zh-CN")
return request
# 删除实例(慎用)
def del_instance_request(self, instance_id, force=False):
request = self._build_del_instance_request()
request.set_accept_format('json')
request.set_InstanceId(instance_id)
# force 为True, 则所有状态(包括Running) 也删除 慎用
request.set_Force(force)
response = self._send_request(request)
if response.get('Code') is None:
print("del instance success: ", response)
else:
print("del instance failure: ", response)
def _build_del_instance_request(self):
request = DeleteInstanceRequest()
request.set_connect_timeout(30) # 超时、秒
return request
# 删除实例(慎用)
def del_instance_request(self, instance_ids, force=False):
request = self._build_del_instances_request()
request.set_accept_format('json')
request.set_InstanceIds(instance_ids)
# force 为True, 则所有状态(包括Running) 也删除 慎用
request.set_Force(force)
response = self._send_request(request)
if response.get('Code') is None:
print(response)
else:
print(response)
def _build_del_instances_request(self):
request = DeleteInstancesRequest()
request.set_connect_timeout(30) # 超时、秒
return request
# 删除实例(慎用)TODO
def del_all_instances_by_query_request(self,
instance_ids=None,
instance_name=None,
status="Stopped", # Pending Running Starting Stopping Stopped
internet_charge_type="PayByTraffic",
force=False,
not_del_instance_ids=None):
instances_list = self.list_instance_request(instance_ids, instance_name, status, internet_charge_type)
while(instances_list and len(instances_list) > 0):
instance_id_list = []
sleep_del_all_instance = 10
if instances_list:
for instance in instances_list:
instance_id = instance.get("InstanceId")
if not_del_instance_ids and instance_id in not_del_instance_ids:
continue
instance_id_list.append(instance_id)
if len(instance_id_list) > 20:
self.del_instance_request(instance_id_list, force)
print("sleep %ss, start to del instance ids:%s" % (sleep_del_all_instance, instance_id_list))
time.sleep(sleep_del_all_instance)
instance_id_list = []
if len(instance_id_list) > 0:
self.del_instance_request(instance_id_list, force)
print("sleep %ss, start to del instance ids:%s" % (sleep_del_all_instance, instance_id_list))
time.sleep(sleep_del_all_instance)
pass
else:
break
instances_list = self.list_instance_request(instance_ids, instance_name, status, internet_charge_type)
# 查询所有实例ECS
def list_instance_request(self,
instance_ids=None,
instance_name=None,
status=None, # Pending Running Starting Stopping Stopped
internet_charge_type="PayByTraffic",
max_results=10):
request = self._build_list_instance_request(instance_ids, instance_name, status, internet_charge_type, max_results)
request.set_accept_format('json')
response = self._send_request(request)
if response.get('Code') is None:
instances_list = response.get('Instances').get('Instance')
return instances_list
else:
print(response)
def _build_list_instance_request(self,
instance_ids,
instance_name,
status, # Pending Running Starting Stopping Stopped
internet_charge_type,
max_results):
request = DescribeInstancesRequest()
request.set_MaxResults(max_results)
request.set_connect_timeout(30) # 超时、秒
if instance_ids and len(instance_ids) > 0:
request.set_InstanceIds(instance_ids)
if instance_name:
request.set_InstanceName(instance_name)
if status:
# Pending Running Starting Stopping Stopped
request.set_Status(status)
request.set_VSwitchId(self.vswitch_id)
request.set_SecurityGroupId(self.security_group_id)
request.set_InternetChargeType(internet_charge_type)
if self.zone_id:
request.set_ZoneId(self.zone_id)
# request.set_endpoint(self.endpoint)
return request
# 创建自定义ECS实例并启动,返回ids列表
def convert_instance_ids(self, instance_list):
instance_ids = []
if instance_list:
for instance in instance_list:
instance_ids.append(instance.get("InstanceId"))
return instance_ids
# 创建自定义ECS实例并启动,返回ips列表
def convert_instance_ip_list(self, instance_list):
instance_ip_list = []
if instance_list:
for instance in instance_list:
instance_ip_list.append(instance.get("PublicIpAddress").get("IpAddress")[0])
return instance_ip_list
# 后台运行py脚本 nohup
def run_py_nohup(self, instance_ids, run_py_file):
shell = "nohup python -u %s > %s.log 2>&1 &" % (run_py_file, run_py_file)
self.run_shell(instance_ids, shell)
def run_shell(self, instance_ids, shell):
self.run_command_request(instance_ids, shell)
def test_run():
print("hello ecs batch create instance")
# 您的access key secret。
# ak_id = "xx"
# ak_secret = "xxx"
# region_id = "cn-hongkong"
# ecs = EcsInstance(ak_id, ak_secret, region_id=region_id)
# print(ecs.create_instances(1))
# ecs.regions_request()
# run_py_file = "proxyReq.py"
# run_py_file_path = os.path.join(os.getcwd(), run_py_file)
# if not os.path.exists(run_py_file_path):
# raise FileNotFoundError("No such file: " + run_py_file_path)
#
# contentStr = ""
# with open(run_py_file_path, mode='r', encoding='utf-8') as f:
# for line in f:
# contentStr += line
# if not contentStr:
# print("读取文件内容不能为空")
# instance_ids = ['i-xxxx']
# ecs.send_file_request(instance_ids, contentStr)
# ecs.run_command_request(instance_ids, "nohup python -u t.py > out.log 2>&1 &")
# ecs.acc_attr_request()
# print(ecs.ecs_password)
# 创建自定义ECS实例并启动。
# create_custom_multiple_instances()
# 创建ECS实例并启动。
# create_multiple_instances()
# 创建绑定公网IP的ECS实例。
# create_multiple_instances_with_public_ip()
# 创建ECS实例并设置自动释放时间。
# create_multiple_instances_with_auto_release_time()
if __name__ == '__main__':
# test_run()
pass