[Python] 纯文本查看 复制代码 import csv
import requests
import logging
import json
from datetime import datetime
import time
# Initialize logging
logging.basicConfig(level=logging.INFO)
def fetch_hot_searches():
url = "https://weibo.com/ajax/statuses/hot_band"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.62'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # Raises HTTPError for bad responses
data = response.json()["data"]["band_list"]
hot = []
count = 0
for item in data:
count += 1
note = item.get('note', 'Unknown')
category = item.get('category', 'Unknown')
raw_hot = item.get('raw_hot', 'Unknown')
hot_url = f'https://s.weibo.com/weibo?q={note}&Refer=index'
onboard_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
hot.append([count, note, category, raw_hot, hot_url, onboard_time])
with open('weibo_hot.csv', 'w', newline='', encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerow(["id", "note", "category", "raw_hot", "url"])
# 遍历列表中的每个元素,写入 CSV 文件的单独一行中
for item in hot:
writer.writerow(item, )
send_msg("热搜实时榜单", str(hot).replace("[","").replace("]","").replace("'",""))
except requests.RequestException as e:
logging.error(f"Request failed: {e}")
except KeyError as e:
logging.error(f"Key error: {e}")
except json.JSONDecodeError as e:
logging.error(f"JSON decode error: {e}")
def send_msg(head, body):
logging.info('----------start send_msg')
url = 'http://push.ijingniu.cn/send'
data = {
'key': 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', # Replace with your actual key
'head': head,
'body': body
}
try:
response = requests.post(url=url, data=data)
response.raise_for_status()
logging.info(response.text)
if response.ok:
return json.loads(response.text)['status']
else:
return False
except requests.RequestException as e:
logging.error(f"Request failed: {e}")
return False
except json.JSONDecodeError as e:
logging.error(f"JSON decode error: {e}")
return False
while True:
fetch_hot_searches()
time.sleep(3600) # 1个小时推送一次
#fetch_hot_searches()
上面代码替换自己的秘钥
|