【Python】阿里云DDNS动态域名解析,无需SDK
本帖最后由 辣丝丝小白菜 于 2020-11-4 11:42 编辑阿里云DDNS动态域名解析-易语言版:https://www.52pojie.cn/thread-1294027-1-1.html
阿里云DDNS动态域名解析-PythonSDK版:https://www.52pojie.cn/thread-783673-1-1.html
此版本不需要专门下载阿里云的SDK,一个文件即可,已经集成了鉴权等操作。
import re, json, time, uuid, random, chardet
import hmac, base64, hashlib
import requests
import urllib.parse, urllib.request
# 轮询时间间隔
TIME_INTERVAL = 5
class PublicIP(object):
def __init__(self):
self.user_agent = ('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3776.400 QQBrowser/10.6.4212.400')
self.api_list = [
'https://202020.ip138.com/',
'https://www.ip.cn/api/index?ip=&type=0',
'http://ip.webmasterhome.cn/',
'http://ip.yqie.com/clientip.aspx',
'https://www.123cha.com/ip/',
'http://ip.chacuo.net/',
]
def ip_query(self):
while True:
url = random.sample(self.api_list, 1)
try:
res = requests.get(url, headers={'User-Agent':self.user_agent}, timeout=5)
encoding = chardet.detect(res.content)['encoding']
html = res.content.decode(encoding)
out = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', html)
if out != []: return out
except Exception as e:
continue
class AscSigner(object):
def __init__(self, AccessKeyId, AccessKeySecret):
self.AccessKeyId = AccessKeyId
self.AccessKeySecret = AccessKeySecret
def _utc(self):
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _uuid(self):
return str(uuid.uuid4())
def _pop_standard_urlencode(self, query):
ret = query.replace('+', '%20')
ret = ret.replace('*', '%2A')
ret = ret.replace('%7E', '~')
return ret
def _common_param(self):
return {
'Format': 'JSON',
'Version': '2015-01-09',
'SignatureMethod': 'HMAC-SHA1',
'SignatureNonce': self._uuid(),
'SignatureVersion': '1.0',
'AccessKeyId': self.AccessKeyId,
'Timestamp': self._utc(),
}
def _string_to_sign(self, param):
sorted_parameters = sorted(list(param.items()), key=lambda param: param)
sorted_query_string = self._pop_standard_urlencode(urllib.parse.urlencode(sorted_parameters))
canonicalized_query_string = self._pop_standard_urlencode(urllib.request.pathname2url(sorted_query_string))
string_to_sign = "GET&%2F&" + canonicalized_query_string
return string_to_sign
def _sign(self, string_to_sign):
param_encode = bytes(string_to_sign, 'utf-8')
key = bytes(self.AccessKeySecret + '&', 'utf-8')
h = hmac.new(key, param_encode, hashlib.sha1)
return str(base64.encodebytes(h.digest()).strip(), "utf-8")
def get_param_and_sign(self, action_param):
common_param = self._common_param()
param = dict(common_param, **action_param)
string_to_sign = self._string_to_sign(param)
signature = self._sign(string_to_sign)
param['Signature'] = signature
url = '?' + self._pop_standard_urlencode(urllib.parse.urlencode(param))
return url
class AscRequestParam(object):
def __init__(self, AccessKeyId, AccessKeySecret):
self.signer = AscSigner(AccessKeyId, AccessKeySecret)
# 获取域名列表
def get_domains_params(self):
action_param = {
'Action':'DescribeDomains',
'PageSize':'100',
}
return self.signer.get_param_and_sign(action_param)
# 获取解析记录
def get_domain_records_params(self, domain_name):
action_param = {
'Action':'DescribeDomainRecords',
'DomainName': domain_name,
'PageSize':'100',
}
return self.signer.get_param_and_sign(action_param)
# 修改解析记录
def update_domain_record_params(self, data):
action_param = {
'Action':'UpdateDomainRecord',
'RR': data['RR'],
'RecordId': data['RecordId'],
'Type': data['Type'],
'Value' :data['Value'],
}
return self.signer.get_param_and_sign(action_param)
# 获取解析记录信息
def get_domain_record_info_params(self, RecordId):
action_param = {
'Action':'DescribeDomainRecordInfo',
'RecordId': RecordId,
}
return self.signer.get_param_and_sign(action_param)
class AscClient(object):
def __init__(self, AccessKeyId, AccessKeySecret):
self.ip = PublicIP()
self.client = AscRequestParam(AccessKeyId, AccessKeySecret)
self.api = 'https://alidns.aliyuncs.com/'
def _request(self, params):
res = requests.get(self.api + params)
return json.loads(res.content.decode('utf-8'))
def _wait(self):
timecount = TIME_INTERVAL
while timecount >= 0:
print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
timecount = timecount - 1
time.sleep(1)
print('\n\n', end="", flush=True)
def update_domain_record(self, record):
ip = self.ip.ip_query()
print(f'公网IP:{ip},')
if ip == record['Value']:
print('公网IP和解析IP相同,无需更新')
return
record['Value'] = ip
params = self.client.update_domain_record_params(record)
self._request(params)
print('解析完成')
def get_domain_records(self, RR, domain_name):
params = self.client.get_domain_records_params(domain_name)
res = self._request(params)
TotalCount = int(res['TotalCount'])
for i in res['DomainRecords']['Record']:
if i['RR'] == RR:
return i
return None
def get_domains(self):
params = self.client.get_domains_params()
res = self._request(params)
domain_list = []
for i in res['Domains']['Domain']:
domain_list.append(i['DomainName'])
return int(res['TotalCount']),domain_list
def start(self,RR,domain_name):
while True:
count, domains = self.get_domains()
if count <= 0 or not domain_name in domains:
print('没有此域名')
return
record = self.get_domain_records(RR, domain_name)
if not record:
print(f'没有找到{RR}的解析记录')
return
if not record['Type'] == 'A':
print('暂时只支持解析A记录')
return
if record['Status'] == 'DISABLE':
print('此记录已被禁用,请启用后再试')
return
self.update_domain_record(record)
self._wait()
if __name__ == '__main__':
# 换成你自己的AccessKeyID, AccessKeySecret
a = AscClient('**********','**********************')
# 如果需要解析 www.baidu.com,填入 www 和 baidu.com
# 如果需要解析 baidu.com填入 @ 和 baidu.com
a.start('test','mydomain.net') 我在linux系统中 使用python main.py 打开文件 总是报错 跪求解决
print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
^
SyntaxError: invalid syntax
senooo 发表于 2020-11-4 13:36
这个是用来干嘛用的,是内网穿透吗 还是自己家里搭建服务 upnp DMZ
开了 公网IP后,可以把域名解析到自己的电脑/NAS。但是自己的网络IP地址不固定,经常换,这个就是检测公网IP 然后自动更新解析。 这个是用来干嘛用的,是内网穿透吗 还是自己家里搭建服务 upnp DMZ 这个有什么用呢,爬取淘宝数据吗? 电信的打客服电话30分钟直接就开通了不懂就问,这个 要怎么说呢 …… 厉害的人物,来学习 小小欣 发表于 2020-11-4 17:50
电信的打客服电话30分钟直接就开通了不懂就问,这个 要怎么说呢
就说 要做监控 要开公网IP ,远程监控之类的 怎么用啊?没看明白
这个很厉害,收藏了
页:
[1]
2