[Python] 纯文本查看 复制代码
import mysql.connector
import requests
import json
import random
import time
import datetime
def query():
USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
]
headers = {
'User-Agent': random.choice(USER_AGENT_LIST),
'Content-Type': 'application/json; charset=utf-8'
}
#数据库信息
mydb = mysql.connector.connect(
host="",
user="",
password="",
database=""
)
mycursor = mydb.cursor()
mycursor.execute("SELECT id, guide_cert_no FROM guide_info WHERE query_results IS NULL AND guide_cert_no IS NOT NULL")
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) #获取时间
print("导游正常查询:", current_time) #看是否执行查询
for row in mycursor.fetchall():
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("导游正常查询及执行:", current_time) #看是否执行查询
id = row[0]
guide_cert_no = row[1]
print(f"第 {id}行, 导游证号: {guide_cert_no}")
data = {
'type': '3',
'value': guide_cert_no
}
print(data)
response = requests.post('https://mr.mct.gov.cn/data/guide/verify', headers=headers, data=json.dumps(data))
time.sleep(0.8)
result = json.loads(response.text)
print(result)
# 每遍历19次暂停42秒
if (id + 1) % 19 == 0:
print("暂停42秒...")
time.sleep(42)
if 'error' in result:
max_retry = 2 # 最大重试次数为2次
retry_count = 0 # 当前重试次数初始化为0
# 循环执行请求直到结果不出错或超过最大重试次数
while retry_count < max_retry:
response = requests.post('https://mr.mct.gov.cn/data/guide/verify', headers=headers, data=json.dumps(data))
time.sleep(0.8) # 每次请求之间间隔0.8秒
result = json.loads(response.text)
print(result)
if 'error' in result: # 如果结果中包含错误信息
retry_count += 1 # 当前重试次数加1
time.sleep(20) # 等待10秒后再次执行请求
else: # 如果结果中不包含错误信息
break # 跳出循环
else: # 如果循环执行了max_retry次仍然没有成功,则输出错误信息
result = json.loads(response.text)
reason = result['reason']
print(reason)
#获取时间并显示,写入的文件。
now = datetime.datetime.now() # 获取当前时间和日期
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S") # 将时间格式化为"YYYY-MM-DD HH:MM:SS"格式
update_query = f"UPDATE guide_info SET query_results='{reason}', query_time='{formatted_now}' WHERE id={id}"
mycursor.execute(update_query)
mydb.commit()
else:
data = result['data']
if 'pic_avatar' in data:
pic_avatar = data['pic_avatar']
user_name = data['user_name']
star_level = data['star_level']
guide_card = data['guide_card']
guide_id = data['guide_id']
level = data['level']
leader_card = data['leader_card']
leader_type_name = data['leader_type_name']
languages = data['languages']
comp_name = data['comp_name']
qr = data['qr']
issued_date = data['issued_date']
expired_date = data['expired_date']
pinyin_name = data['pinyin_name']
qr_img_url = data['qr_img_url']
##查询数据库中导游的姓名
select_query = f"SELECT guide_name FROM guide_info WHERE id = {id}"
mycursor.execute(select_query)
result = mycursor.fetchone()
guide_name = result[0]
now = datetime.datetime.now() # 获取当前时间和日期
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S") # 将时间格式化为"YYYY-MM-DD HH:MM:SS"格式
#写入到数据库
update_query = f"UPDATE guide_info SET query_results='查询成功', pic_avatar='{pic_avatar}', user_name='{user_name}', star_level='{star_level}', guide_card='{guide_card}', guide_id='{guide_id}', level='{level}', leader_card='{leader_card}', leader_type_name='{leader_type_name}', languages='{languages}',comp_name='{comp_name}', qr='{qr}', issued_date='{issued_date}', expired_date='{expired_date}', pinyin_name='{pinyin_name}', qr_img_url='{qr_img_url}', query_time='{formatted_now}' WHERE id={id}"
mycursor.execute(update_query)
mydb.commit()
if user_name and user_name == guide_name:
mycursor.execute(f"UPDATE guide_info SET check_name='姓名正确' WHERE id={id}")
#可以使用以下代码来查询当前行的 guide_id_card 值:
select_query = f"SELECT guide_id_card FROM guide_info WHERE id = {id}"
mycursor.execute(select_query)
result = mycursor.fetchone()
guide_id_card = result[0]
# 查询数据库中duplicate_queries列的所有值,排除空值
mycursor.execute("SELECT duplicate_queries FROM guide_info WHERE duplicate_queries IS NOT NULL AND duplicate_queries != ''")
myresult = mycursor.fetchall()
result_str = guide_name + guide_cert_no + guide_id_card
myresult_list = []
for row in myresult:
myresult_list.append(row[0])
result_str = guide_name + guide_cert_no + guide_id_card
if result_str in myresult_list:
mycursor.execute(f"UPDATE guide_info SET duplicate_queries = CONCAT('重复') WHERE id={id}")
print('重复')
mydb.commit()
else:
mycursor.execute(f"UPDATE guide_info SET duplicate_queries = CONCAT(guide_name, guide_cert_no, guide_id_card) WHERE id={id}")
mydb.commit()
#print('不重复')
# mycursor.execute(f"UPDATE guide_info SET duplicate_queries = CONCAT(guide_name, guide_cert_no, guide_id_card) WHERE id={id}")
#mydb.commit()
else:
tip = data['tip']
now = datetime.datetime.now() # 获取当前时间和日期
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S") # 将时间格式化为"YYYY-MM-DD HH:MM:SS"格式
update_query = f"UPDATE guide_info SET query_results='{tip}', query_time='{formatted_now}' WHERE id={id}"
mycursor.execute(update_query)
mydb.commit()
# 更新last_processed_id和last_updated字段
update_query = "UPDATE guide_status SET last_processed_id = %s, last_updated = %s"
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
update_params = (id, now)
mycursor.execute(update_query, update_params)
mydb.commit()