Python+selenium批量下载微信公众号内的文字和图片
本帖最后由 lzmomo 于 2024-6-4 09:29 编辑初学python,希望大家多提议意见运行程序,输入网址,运行后会在path/to/save/路径下载文本和图片,文本以txt保存,文件名按照标题名称生成。
requests可以直接下载图片,但是不知道怎么下载文本,谁能教教我吗:loveliness:
# -*- coding:UTF-8 -*-
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from time import sleep
import requests
myoption = webdriver.ChromeOptions()
myoption.add_experimental_option("excludeSwitches", ["enable-automation"])
myoption.add_experimental_option('useAutomationExtension', False)
myoption.add_argument("--headless")# 无界面浏览
wd = webdriver.Chrome(service=Service(r'D:\chrome\chrome-win64\chromedriver.exe'), options=myoption)
wd.maximize_window()
wd.implicitly_wait(10)
url = input('输入网址:')
wd.get(url)
# 获取文本
span = wd.find_elements(By.XPATH, '//span[@style]')
title = wd.find_element(By.CLASS_NAME, 'rich_media_title').text
text_list = []
for text in span:
span_text = f"{text.text}"
text_list.append(span_text)
liebiao_text = text_list
merged_text = ''.join(liebiao_text)
new_merged_text = merged_text.replace('。', '\n')
# 打开文件
file = open('path/to/save/' + title + '.txt', "w", encoding='utf-8')
# 写入数据
file.write(new_merged_text)
# 关闭文件
file.close()
# 获取图片地址
pic = wd.find_elements(By.XPATH, '//img[@data-src]')
num = len(pic)
for num_i in range(num):
# print(num_i)
pic_url = pic.get_attribute("data-src")
response = requests.get(pic_url)
mingcheng = title + str(num_i)
with open('path/to/save/image_' + mingcheng + '.jpg', 'wb') as f:
# 写入获取到的内容
f.write(response.content)
print('正在下载第: ' + str(num_i) + '张图片')
sleep(0.5)
例如:https://mp.weixin.qq.com/s/C44gOkxm-Tf5JFWSlz_ezw
用我这个吧,完美版:
import ctypes
import subprocess
import sys
import os
import random
import re
import uuid
import shutil
import datetime
import requests
import secrets
from bs4 import BeautifulSoup
from qiniu import Auth, put_file, BucketManager, urlsafe_base64_encode
import qiniu.config
import mimetypes
import pyperclip
from datetime import datetime as dt
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QPushButton, QHBoxLayout, QLineEdit, QTextEdit
from PyQt5.QtCore import Qt, QSettings, QSize, QPoint, pyqtSignal, QThread
from PyQt5.QtGui import QDragEnterEvent, QDropEvent
# 隐藏命令行窗口
def hide_console():
hwnd = ctypes.windll.kernel32.GetConsoleWindow()
if hwnd:
ctypes.windll.user32.ShowWindow(hwnd, 0)
# 设置窗口位置和大小
def set_console_position_and_size(left, top, width, height):
# 获取控制台窗口句柄
hwnd = ctypes.windll.kernel32.GetConsoleWindow()
if hwnd:
# 调用 MoveWindow 函数来设置窗口位置和大小
ctypes.windll.user32.MoveWindow(hwnd, left, top, width, height, True)
# 设置窗口位置 (左, 上) 和大小 (宽度, 高度)
left = -7
top = 1506
width = 335
height = 200
# 设置命令行窗口的大小和位置
set_console_position_and_size(left, top, width, height)
# 七牛云配置信息
access_key = '手动填写'
secret_key = '手动填写'
bucket_name = '手动填写'
domain = '手动填写'
pipeline = 'default.sys'
key = 'l/tupian/'
fops = 'imageView2/0/interlace/1/q/90|imageslim'
uploaded_images = {}# 用于存储已经上传过的图片
q = Auth(access_key, secret_key)
bucket = BucketManager(q)
policy = {'persistentPipeline': pipeline}
if os.path.exists('.qiniu_pythonsdk_hostscache.json'):
os.remove('.qiniu_pythonsdk_hostscache.json')
class FetchImagesThread(QThread):
progress_signal = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
self.fetch_images_from_url(self.url)
def fetch_images_from_url(self, url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
content = soup.find('div', {'id': 'js_content'})
if not os.path.exists('D:\\临时图片'):
os.makedirs('D:\\临时图片')
img_re = re.compile(r'<img [^<>]*src=["\']?([^"\'>]+)["\']?[^<>]*>')
img_list = img_re.findall(str(content))
bg_img_re = re.compile(r'background-image:\s*url\(["\']?([^"\'>]+)["\']?\)')
bg_img_list = bg_img_re.findall(str(content))
img_list.extend(bg_img_list)
for index, img_url in enumerate(img_list):
if img_url in uploaded_images:
img_new_url = uploaded_images
else:
response = requests.head(img_url)
content_type = response.headers.get('content-type')
img_type = content_type.split('/')[-1]
if img_type == 'jpeg':
img_ext = '.jpg'
elif img_type == 'png':
img_ext = '.png'
elif img_type == 'gif':
img_ext = '.gif'
elif img_type == 'svg+xml':
img_ext = '.svg'
else:
img_ext = '.jpg'
new_img_url = re.sub(r'\.\w+$', img_ext, img_url)
img_data = requests.get(new_img_url).content
img_name = f"D:\\临时图片\\{secrets.token_hex(4)}{img_ext}"
with open(img_name, 'wb') as f:
f.write(img_data)
today = datetime.datetime.now().strftime('%Y/%m-%d')
title = url.split('/')[-1].split('.')
img_key = f"{key}{today}/{title}/{secrets.token_hex(4)}{img_ext}"
token = q.upload_token(bucket_name, img_key)
ret, info = put_file(token, img_key, img_name, check_crc=True)
img_new_url = f"{domain}/{img_key}"
uploaded_images = img_new_url
content = re.sub(r'src=["\']?' + re.escape(img_url) + r'["\']?', f'src="{img_new_url}"', str(content))
content = re.sub(r'background-image:\s*url\(["\']?' + re.escape(img_url) + r'["\']?\)', f'background-image: url("{img_new_url}")', str(content))
self.progress_signal.emit(f"替换后的链接: {img_new_url}")
content = re.sub(r'data-src', 'src', content)
timestamp = datetime.datetime.now().strftime('%H.%M.%S')
file_name = f"C:/Users/Administrator/Desktop/采集_{timestamp}.html"
with open(file_name, 'w', encoding='utf-8') as f:
# 删除生成的无用代码段
cleaned_content = re.sub(r'style="display: none;"', '', str(content))
f.write(cleaned_content)
self.progress_signal.emit(f"文件已保存至:{file_name}")
shutil.rmtree('D:\\临时图片')
self.progress_signal.emit("下载的文件已删除")
if os.path.exists('.qiniu_pythonsdk_hostscache.json'):
os.remove('.qiniu_pythonsdk_hostscache.json')
class DragDropWidget(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
self.setAcceptDrops(True)
self.last_file_is_video = False
# 设置窗口位置到左下角
screen = QApplication.primaryScreen().availableGeometry()
self.move(screen.left(), screen.bottom() - self.height() - 276)
# 读取上次保存的窗口位置和大小
settings = QSettings('MyApp', 'DragDropWidget')
self.resize(settings.value('size', QSize(400, 300)))
self.move(settings.value('pos', QPoint(300, 300)))
def init_ui(self):
self.setWindowTitle('图片+视频+公众号采集')
self.setGeometry(300, 300, 400, 300)# 窗口大小设置为400x300
layout = QVBoxLayout()
self.url_input = QLineEdit(self)
self.url_input.setPlaceholderText('请输入公众号文章的链接')
layout.addWidget(self.url_input)
self.fetch_button = QPushButton('点击采集', self)
self.fetch_button.clicked.connect(self.fetch_images)
layout.addWidget(self.fetch_button)
self.label = QLabel('请拖拽文件或文件夹到此窗口进行上传', self)
self.label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.label)
self.result_label = QLabel('', self)
self.result_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.result_label)
self.progress_text = QTextEdit(self)
self.progress_text.setReadOnly(True)
layout.addWidget(self.progress_text)
self.size_label = QLabel('当前默认尺寸:宽1200px', self)
self.size_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.size_label)
button_layout = QHBoxLayout()
self.button_1200px = QPushButton('宽1200px', self)
self.button_1200px.clicked.connect(lambda: self.set_size_option('1200px'))
self.button_original = QPushButton('原始尺寸', self)
self.button_original.clicked.connect(lambda: self.set_size_option('原始尺寸'))
button_layout.addWidget(self.button_1200px)
button_layout.addWidget(self.button_original)
layout.addLayout(button_layout)
self.setLayout(layout)
self.size_option = '1200px'
def fetch_images(self):
url = self.url_input.text().strip()
if url:
self.progress_text.append("开始采集图片...")
self.thread = FetchImagesThread(url)
self.thread.progress_signal.connect(self.update_progress)
self.thread.start()
def set_size_option(self, option):
self.size_option = option
self.size_label.setText(f'当前选择:{option}')
if option == '1200px':
self.button_1200px.setStyleSheet('background-color: lightblue')
self.button_original.setStyleSheet('')
else:
self.button_original.setStyleSheet('background-color: lightblue')
self.button_1200px.setStyleSheet('')
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
def dropEvent(self, event):
uploaded_files = []
for url in event.mimeData().urls():
file_path = url.toLocalFile()
mime_type, _ = mimetypes.guess_type(file_path)
if os.path.isdir(file_path):
# 如果是文件夹,遍历文件夹内所有文件并上传
for root, dirs, files in os.walk(file_path):
for file in files:
file_path = os.path.join(root, file)
result = self.upload_file_or_video(file_path)
if result:
uploaded_files.append(result)
else:
# 如果是文件,直接上传
result = self.upload_file_or_video(file_path)
if result:
uploaded_files.append(result)
if uploaded_files:
self.result_label.setText(f'上传成功,共上传 {len(uploaded_files)} 个文件')
self.generate_html(uploaded_files)
else:
self.result_label.setText('上传失败,请重试')
def upload_file_or_video(self, file_path):
key_prefix = 'l/tupian/' if file_path.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')) else 'l/shipin/'
key = key_prefix + dt.now().strftime('%Y/%m-%d') + '/' + str(random.randint(10000000, 99999999)) + os.path.basename(file_path)
saveas_key = urlsafe_base64_encode(f'{bucket_name}:{key}')
if key_prefix == 'l/tupian/':
fops = 'imageView2/0/interlace/1/q/90|imageslim|saveas/' + saveas_key
else:
fops = 'vframe/jpg/offset/0.1/saveas/' + saveas_key
policy['persistentOps'] = fops
token = q.upload_token(bucket_name, key, 3600, policy=policy)
ret, info = put_file(token, key, file_path, version='v2')
if info.status_code == 200:
self.update_progress(f'{key} {"图片" if key_prefix == "l/tupian/" else "视频"}上传成功')
return f'{domain}/{key}'
return None
def generate_html(self, uploaded_files):
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")
html_path = os.path.join(desktop_path, 'uploaded_files.html')
with open(html_path, 'w', encoding='utf-8') as f:
f.write('<style style="text-align: center;"></style><p>')
for i, file in enumerate(uploaded_files):
if file.endswith(('.jpg', '.jpeg', '.png', '.gif')):
if self.size_option == '1200px':
f.write(f'<img src="{file}" style="width: 1200px;"/>')
else:
f.write(f'<img src="{file}" style="width: auto;"/>')
elif file.endswith(('.mp4', '.avi', '.mkv', '.mov')):
if not self.last_file_is_video:
f.write(f'<video poster="{file}?vframe/jpg/offset/0.1" src="{file}" controls="controls" style="width:100%; height:100%;" autoplay="autoplay" webkit-playsinline="true" playsinline="true" x5-playsinline="true"></video></p>')
# Copy the video code to the clipboard
pyperclip.copy(f'<p><video poster="{file}?vframe/jpg/offset/0.1" src="{file}" controls="controls" style="width:100%; height:100%;" autoplay="autoplay" webkit-playsinline="true" playsinline="true" x5-playsinline="true"></video></p>')
self.last_file_is_video = True
self.update_progress(f'已生成HTML文件:{html_path}')
if os.path.exists('.qiniu_pythonsdk_hostscache.json'):
os.remove('.qiniu_pythonsdk_hostscache.json')
def update_progress(self, message):
self.progress_text.append(message)
def closeEvent(self, event):
# 保存窗口位置和大小
settings = QSettings('MyApp', 'DragDropWidget')
settings.setValue('size', self.size())
settings.setValue('pos', self.pos())
event.accept()
if __name__ == '__main__':
hide_console()
app = QApplication(sys.argv)
widget = DragDropWidget()
widget.show()
sys.exit(app.exec_())
response.text这是返回的文本对象,然后用write写入到文件就行。注意下编码问题 要在Python中保存文本,你可以使用以下代码:
```python
with open("文件名.txt", "w", encoding="utf-8") as f:
f.write("要保存的文本内容")
```
将"文件名.txt"替换为你想要保存的文件名,将"要保存的文本内容"替换为你想要保存的文本。 不能下载里面的视频吗? 你在哪里自学的教程推介一下。 import tkinter as tk
from tkinter import filedialog, messagebox
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from time import sleep
import requests
import os
def download_content():
# 获取用户输入的 URL
url = url_entry.get()
if not url:
messagebox.showerror("错误", "请输入网址")
return
# 获取保存路径
save_path = filedialog.askdirectory()
if not save_path:
messagebox.showerror("错误", "请选择保存路径")
return
# 确保保存路径存在
os.makedirs(save_path, exist_ok=True)
# 使用 requests 获取网页内容
response = requests.get(url)
# 检查请求是否成功
if response.status_code == 200:
# 解析 HTML
soup = BeautifulSoup(response.content, 'html.parser')
# 提取文本内容
paragraphs = soup.find_all('p')
text_list =
# 合并文本
merged_text = '\n'.join(text_list)
# 获取网页标题
try:
title = soup.title.string.strip() if soup.title else 'default_title'
except AttributeError:
title = 'default_title'
messagebox.showwarning("提示", "该网页可能没有明确的标题元素")
# 保存文本到文件
with open(os.path.join(save_path, f'{title}.txt'), 'w', encoding='utf-8') as file:
file.write(merged_text)
print('文本已保存')
# 设置浏览器选项
myoption = webdriver.EdgeOptions()
myoption.add_experimental_option("excludeSwitches", ["enable-automation"])
myoption.add_experimental_option('useAutomationExtension', False)
myoption.add_argument("--headless")# 无界面浏览
try:
wd = webdriver.Edge(service=Service(r'C:\Program Files (x86)\Microsoft\Edge\Application\edgedriver_win64\msedgedriver.exe'), options=myoption)
wd.maximize_window()
wd.implicitly_wait(10)
# 获取图片地址并下载
wd.get(url)
pic = wd.find_elements(By.XPATH, '//img[@data-src]')
num = len(pic)
for num_i in range(num):
pic_url = pic.get_attribute("data-src")
response = requests.get(pic_url)
mingcheng = title + str(num_i)
with open(os.path.join(save_path, f'image_{mingcheng}.jpg'), 'wb') as f:
f.write(response.content)
print('正在下载第:' + str(num_i) + '张图片')
sleep(0.5)
# 关闭浏览器
wd.quit()
except Exception as e:
messagebox.showerror("错误", f'浏览器操作出错: {e}')
messagebox.showinfo("成功", "文本和图片已保存")
else:
messagebox.showerror("错误", f'请求失败,状态码:{response.status_code}')
# 创建主窗口
root = tk.Tk()
root.title("网页内容下载器")
# 创建并放置控件
tk.Label(root, text="请输入网址:").grid(row=0, column=0, padx=10, pady=10)
url_entry = tk.Entry(root, width=50)
url_entry.grid(row=0, column=1, padx=10, pady=10)
download_button = tk.Button(root, text="下载内容", command=download_content)
download_button.grid(row=1, columnspan=2, pady=10)
# 运行主循环
root.mainloop() 捷豹网络丶贱仔 发表于 2024-6-4 11:12
import tkinter as tk
from tkinter import filedialog, messagebox
from sele ...
感谢感谢,认真学习了 青春、浮华梦 发表于 2024-6-4 12:52
你在哪里自学的教程推介一下。
我没有系统的学呢,都是在csdn上搜问题,找解决办法。