辣丝丝小白菜 发表于 2020-11-4 11:41

【Python】阿里云DDNS动态域名解析,无需SDK

本帖最后由 辣丝丝小白菜 于 2020-11-4 11:42 编辑

阿里云DDNS动态域名解析-易语言版:https://www.52pojie.cn/thread-1294027-1-1.html
阿里云DDNS动态域名解析-PythonSDK版:https://www.52pojie.cn/thread-783673-1-1.html

此版本不需要专门下载阿里云的SDK,一个文件即可,已经集成了鉴权等操作。


import re, json, time, uuid, random, chardet
import hmac, base64, hashlib
import requests
import urllib.parse, urllib.request

# 轮询时间间隔
TIME_INTERVAL = 5


class PublicIP(object):
    def __init__(self):
      self.user_agent = ('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
      'Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3776.400 QQBrowser/10.6.4212.400')
      self.api_list = [
            'https://202020.ip138.com/',
            'https://www.ip.cn/api/index?ip=&type=0',
            'http://ip.webmasterhome.cn/',
            'http://ip.yqie.com/clientip.aspx',
            'https://www.123cha.com/ip/',
            'http://ip.chacuo.net/',
      ]

    def ip_query(self):
      while True:
            url = random.sample(self.api_list, 1)
            try:
                res = requests.get(url, headers={'User-Agent':self.user_agent}, timeout=5)
                encoding = chardet.detect(res.content)['encoding']
                html = res.content.decode(encoding)
                out = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', html)
                if out != []: return out
            except Exception as e:
                continue

class AscSigner(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
      self.AccessKeyId = AccessKeyId
      self.AccessKeySecret = AccessKeySecret

    def _utc(self):
      return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

    def _uuid(self):
      return str(uuid.uuid4())

    def _pop_standard_urlencode(self, query):
      ret = query.replace('+', '%20')
      ret = ret.replace('*', '%2A')
      ret = ret.replace('%7E', '~')
      return ret

    def _common_param(self):
      return {
            'Format': 'JSON',
            'Version': '2015-01-09',
            'SignatureMethod': 'HMAC-SHA1',
            'SignatureNonce': self._uuid(),
            'SignatureVersion': '1.0',
            'AccessKeyId': self.AccessKeyId,
            'Timestamp': self._utc(),
      }

    def _string_to_sign(self, param):
      sorted_parameters = sorted(list(param.items()), key=lambda param: param)
      sorted_query_string = self._pop_standard_urlencode(urllib.parse.urlencode(sorted_parameters))
      canonicalized_query_string = self._pop_standard_urlencode(urllib.request.pathname2url(sorted_query_string))
      string_to_sign = "GET&%2F&" + canonicalized_query_string
      return string_to_sign

    def _sign(self, string_to_sign):
      param_encode = bytes(string_to_sign, 'utf-8')
      key = bytes(self.AccessKeySecret + '&', 'utf-8')
      h = hmac.new(key, param_encode, hashlib.sha1)
      return str(base64.encodebytes(h.digest()).strip(), "utf-8")

    def get_param_and_sign(self, action_param):
      common_param = self._common_param()
      param = dict(common_param, **action_param)
      string_to_sign = self._string_to_sign(param)
      signature = self._sign(string_to_sign)
      param['Signature'] = signature
      url = '?' + self._pop_standard_urlencode(urllib.parse.urlencode(param))
      return url

class AscRequestParam(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
      self.signer = AscSigner(AccessKeyId, AccessKeySecret)

    # 获取域名列表
    def get_domains_params(self):
      action_param = {
                'Action':'DescribeDomains',
                'PageSize':'100',
            }
      return self.signer.get_param_and_sign(action_param)

    # 获取解析记录
    def get_domain_records_params(self, domain_name):
      action_param = {
                'Action':'DescribeDomainRecords',
                'DomainName': domain_name,
                'PageSize':'100',
            }
      return self.signer.get_param_and_sign(action_param)

    # 修改解析记录
    def update_domain_record_params(self, data):
      action_param = {
                'Action':'UpdateDomainRecord',
                'RR': data['RR'],
                'RecordId': data['RecordId'],
                'Type': data['Type'],
                'Value' :data['Value'],
            }
      return self.signer.get_param_and_sign(action_param)

    # 获取解析记录信息
    def get_domain_record_info_params(self, RecordId):
      action_param = {
                'Action':'DescribeDomainRecordInfo',
                'RecordId': RecordId,
            }
      return self.signer.get_param_and_sign(action_param)

class AscClient(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
      self.ip = PublicIP()
      self.client = AscRequestParam(AccessKeyId, AccessKeySecret)
      self.api = 'https://alidns.aliyuncs.com/'

    def _request(self, params):
      res = requests.get(self.api + params)
      return json.loads(res.content.decode('utf-8'))

    def _wait(self):
      timecount = TIME_INTERVAL
      while timecount >= 0:
            print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
            timecount = timecount - 1
            time.sleep(1)
      print('\n\n', end="", flush=True)

    def update_domain_record(self, record):
      ip = self.ip.ip_query()
      print(f'公网IP:{ip},')
      if ip == record['Value']:
            print('公网IP和解析IP相同,无需更新')
            return
      record['Value'] = ip
      params = self.client.update_domain_record_params(record)
      self._request(params)
      print('解析完成')

    def get_domain_records(self, RR, domain_name):
      params = self.client.get_domain_records_params(domain_name)
      res = self._request(params)
      TotalCount = int(res['TotalCount'])
      for i in res['DomainRecords']['Record']:
            if i['RR'] == RR:
                return i
      return None

    def get_domains(self):
      params = self.client.get_domains_params()
      res = self._request(params)
      domain_list = []
      for i in res['Domains']['Domain']:
            domain_list.append(i['DomainName'])
      return int(res['TotalCount']),domain_list

    def start(self,RR,domain_name):
      while True:
            count, domains = self.get_domains()
            if count <= 0 or not domain_name in domains:
                print('没有此域名')
                return
            record = self.get_domain_records(RR, domain_name)
            if not record:
                print(f'没有找到{RR}的解析记录')
                return
            if not record['Type'] == 'A':
                print('暂时只支持解析A记录')
                return
            if record['Status'] == 'DISABLE':
                print('此记录已被禁用,请启用后再试')
                return
            self.update_domain_record(record)
            self._wait()


if __name__ == '__main__':
    # 换成你自己的AccessKeyID, AccessKeySecret
    a = AscClient('**********','**********************')

    # 如果需要解析 www.baidu.com,填入 www 和 baidu.com
    # 如果需要解析 baidu.com填入 @ 和 baidu.com
    a.start('test','mydomain.net')

皮友 发表于 2021-1-20 00:15

我在linux系统中 使用python main.py 打开文件 总是报错 跪求解决

print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
                                                       ^
SyntaxError: invalid syntax

辣丝丝小白菜 发表于 2020-11-5 11:08

senooo 发表于 2020-11-4 13:36
这个是用来干嘛用的,是内网穿透吗 还是自己家里搭建服务 upnp DMZ

开了 公网IP后,可以把域名解析到自己的电脑/NAS。但是自己的网络IP地址不固定,经常换,这个就是检测公网IP 然后自动更新解析。

senooo 发表于 2020-11-4 13:36

这个是用来干嘛用的,是内网穿透吗 还是自己家里搭建服务 upnp DMZ

jhjhsxs 发表于 2020-11-4 13:45

这个有什么用呢,爬取淘宝数据吗?

小小欣 发表于 2020-11-4 17:50

电信的打客服电话30分钟直接就开通了不懂就问,这个 要怎么说呢

kantal 发表于 2020-11-4 21:42

…… 厉害的人物,来学习

辣丝丝小白菜 发表于 2020-11-5 11:05

小小欣 发表于 2020-11-4 17:50
电信的打客服电话30分钟直接就开通了不懂就问,这个 要怎么说呢

就说 要做监控 要开公网IP ,远程监控之类的

shd1971 发表于 2020-11-5 18:39

怎么用啊?没看明白

yek2furw 发表于 2021-1-20 08:51

这个很厉害,收藏了
页: [1] 2
查看完整版本: 【Python】阿里云DDNS动态域名解析,无需SDK