一直在追的几部动漫,这个网站更新的比较快,所以就写段代码来监控一下,今天2024.4.26日以下动漫的最新集数:
[HTML] 纯文本查看 复制代码 {"遮天动画版": "54", "吞噬星空": "116", "仙逆": "33", "斗破苍穹 年番": "93", "诛仙动画版": "32", "完美世界": "160"}
监控的PYTHON代码,我放到VPS的宝塔上,设置计划任务半小时监控一次,
用JSON文件来存储动漫名称和集数,当有更新就会发送信息到TG提醒并更新JSON文件中的集数
[Python] 纯文本查看 复制代码 import requests
import re
from lxml import etree
from telegram import Bot
import asyncio
import json
# Telegram Bot Token
TOKEN = '你的TOKEN'
# Telegram Chat ID
CHAT_ID = '你的CHAT_ID '
# 存储每个动漫的上次 episode_number 和 base_url 的文件路径
EPISODES_FILE = 'hainatv.json'
def get_episode_number(text):
match = re.search(r'第(\d+)集', text)
if match:
return match.group(1)
else:
return "未找到数字"
def get_base_url(keyword):
url = f'https://www.hainatv.net/index.php/vod/search.html?wd={keyword}'
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
'Referer': 'https://www.hainatv.net'
}
try:
response = requests.get(url, headers=header, timeout=10)
response.raise_for_status() # 检查响应状态
html = etree.HTML(response.text)
link = html.xpath(f'//div[@class="hl-item-content"]//a[@title="{keyword}"]/@href')
if link:
base_url = 'https://www.hainatv.net' + link[0]
return base_url
else:
print(f"No link found for {keyword}")
return None
except requests.exceptions.Timeout:
print("请求超时")
return None
except requests.exceptions.RequestException as e:
print(f"请求发生错误: {e}")
return None
async def send_telegram_message(message):
bot = Bot(token=TOKEN)
await bot.send_message(chat_id=CHAT_ID, text=message)
def read_previous_episodes():
try:
with open(EPISODES_FILE, 'r', encoding='utf-8') as file:
previous_episodes = json.load(file)
return previous_episodes
except FileNotFoundError:
return {}
def write_previous_episodes(previous_episodes):
with open(EPISODES_FILE, 'w', encoding='utf-8') as file:
json.dump(previous_episodes, file, ensure_ascii=False)
async def check_for_updates():
previous_episodes = read_previous_episodes()
updated_episodes = {}
for keyword in previous_episodes.keys():
base_url = get_base_url(keyword)
if base_url is None:
continue # 如果获取 base_url 失败,则跳过当前动漫的检查
try:
response = requests.get(base_url, timeout=10)
response.raise_for_status() # 检查响应状态
base_html = etree.HTML(response.text)
muted = base_html.xpath('//span[@class="hl-text-conch"]/text()')[0]
episode_number = get_episode_number(muted)
if previous_episodes[keyword] is None or episode_number != previous_episodes[keyword]:
message = f"{keyword} 当前更新至第{episode_number}集\n{base_url}"
await send_telegram_message(message)
updated_episodes[keyword] = episode_number
except requests.exceptions.Timeout:
print("请求超时")
continue
except requests.exceptions.RequestException as e:
print(f"请求发生错误: {e}")
continue
# 只更新有更新的动漫集数信息到文件中
if updated_episodes:
previous_episodes.update(updated_episodes)
write_previous_episodes(previous_episodes)
if __name__ == "__main__":
asyncio.run(check_for_updates())
|