[Python] 纯文本查看 复制代码
import json
import datetime
import time
import traceback
import click as click
from tencentcloud.common import credential
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.dnspod.v20210323 import dnspod_client, models
SECRET_ID = ""
SECRET_KEY = ""
def try_catch_and_score_time(fn):
def _wrapper(*args, **kwargs):
start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{start} start ddns.")
ret = None
try:
ret = fn(*args, **kwargs)
except TencentCloudSDKException as e:
print(f"{traceback.format_exc()} error: {e}")
end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{end} program exit now.")
return ret
return _wrapper
class DDNS:
def __init__(self, domain, sub_domain="", ipv6=True, check_ip_fmt=False):
self._domain = domain
self._sub_domain = sub_domain if len(sub_domain) > 0 else ("ipv6" if ipv6 else "ipv4")
self._is_ipv6 = ipv6
self._record_type = "AAAA" if self._is_ipv6 else "A" # 待修改解析记录的解析类型,AAAA为解析到ipv6的记录类型
self._client = self.get_tencentcloud_client()
self._check_ip_format = check_ip_fmt
@try_catch_and_score_time
def dynamic_modify_dns(self):
"""
默认DDNS到ipv6地址
"""
if self._client is None:
return
# 1. 获取域名下的所有解析记录
req = models.DescribeRecordRequest()
params = {
"Domain": self._domain
}
req.from_json_string(json.dumps(params))
resp = self._client.DescribeRecordList(req)
# with open("record_list.json", "w") as f:
# f.write(resp.to_json_string())
record_list = resp.RecordList
if len(record_list) <= 0:
print(f"RecordList length is 0. service will exit.")
return
# 2. 过滤解析记录列表,拿到需要修改的记录
record_need_to_modify: models.RecordListItem = None
for record in record_list:
name = record.Name
record_type = record.Type
if self._sub_domain != name or self._record_type != record_type:
continue
record_need_to_modify = record
old_value = None
if record_need_to_modify is None:
print(f"{self._domain} doesn't have {self._sub_domain} {self._record_type} sub domain. "
f"to create {self._sub_domain}.{self._domain} {self._record_type} record.")
mod_record_id = self.create_dns_record()
else:
mod_record_id = record_need_to_modify.RecordId
old_value = record_need_to_modify.Value
# 3. 修改解析记录到动态ip上
dst_ip = self.get_dst_ip_for_dns_record(is_ipv6=self._is_ipv6, check=self._check_ip_format)
if dst_ip is None:
print(f"can't get new ip for ddns.")
return
if old_value == dst_ip:
print(f"value doesn't need to update. value: {old_value}.")
return
req = models.ModifyDynamicDNSRequest()
params = {
"Domain": self._domain,
"SubDomain": self._sub_domain, # 如果不传,默认为 @
"RecordId": mod_record_id,
"RecordLine": "默认",
"Value": dst_ip
}
print(params)
req.from_json_string(json.dumps(params))
resp = self._client.ModifyDynamicDNS(req)
if str(mod_record_id) in resp.to_json_string():
print("successfully update ddns record!")
@staticmethod
def get_tencentcloud_client():
for cnt in range(3):
try:
cred = credential.Credential(SECRET_ID, SECRET_KEY)
httpProfile = HttpProfile()
httpProfile.endpoint = "dnspod.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = dnspod_client.DnspodClient(cred, "", clientProfile)
return client
except TencentCloudSDKException as e:
print(f"program get tencent cloud client error for {cnt}: {e}.")
time.sleep(cnt + 1)
return None
def create_dns_record(self):
req = models.CreateRecordRequest()
params = {
"Domain": self._domain,
"SubDomain": self._sub_domain,
"RecordType": self._record_type,
"RecordLine": "默认",
"Value": "::1" if self._is_ipv6 else "127.0.0.1"
}
req.from_json_string(json.dumps(params))
resp = self._client.CreateRecord(req)
return resp.RecordId
def get_dst_ip_for_dns_record(self, is_ipv6, check=True):
"""
获取解析到的ip
:return:
"""
# 1. 第一种方法,向免费公共方法获取目前的外网IP
import requests
url = f"https://api{6 if is_ipv6 else ''}.ipify.org?format=json"
r = requests.get(url, timeout=5)
ip = json.loads(r.text).get('ip', None)
if check:
ip = self.check_ip_and_format(ip)
return ip
def check_ip_and_format(self, ipv6: str):
if ipv6 == None:
return None
try:
import IPy
v = 6 if self._is_ipv6 else 4
IP = IPy.IP(ipv6)
IP.iptype()
# if IP.version == v and IP.iptype() in ['ALLOCATED APNIC']:
if IP.version() == v:
return str(IP)
except Exception as e:
print(e)
return None
# * * * * * python dynamic_modify_dns.py -domain=xxx.top -sub_domain=ipv6 -is_ipv6 -check_ip_fmt > /tmp/ddns.log
@click.command()
@click.option('-domain', required=True, default="", help="个人拥有的域名")
@click.option('-sub_domain', default="ipv6", help="设置的二级域名")
@click.option('-is_ipv6', is_flag=True, default=False, help="是否DDNS到ipv6上")
@click.option('-check_ip_fmt', is_flag=True, default=False, help="检查IP格式")
def cmd(domain, sub_domain, is_ipv6, check_ip_fmt):
DDNS(domain=domain, sub_domain=sub_domain,
ipv6=is_ipv6, check_ip_fmt=check_ip_fmt).dynamic_modify_dns()
if __name__ == '__main__':
cmd()