好友
阅读权限 25
听众
最后登录 1970-1-1
话痨司机啊
发表于 2022-6-29 10:59
本帖最后由 话痨司机啊 于 2022-7-5 19:36 编辑
本身就是学医的,对健康方面的公众号关注较多,工作上可以使用到一些资料也来源于公众号,偶然机会发现有大佬发布了关于下载公众号历史文章的帖子,按着大佬思路,写了一个python版本的,主要用于下载公众号内的音频,希望对于有这方面需求的小伙伴一些帮助。
顺便说一下用到的分析软件:fiddler、SQLiteManager、postman、Apifox
把公众号“某中医药”数据导出来一份,里面有一些处方,希望帮助一些有慢性病的人,网址直接复制就可以访问,excel格式。
wx.zip
(1.34 MB, 下载次数: 300)
效果展示:
正在爬取,总计1629页
数据库结构
数据清洗后,提取有用的链接进行音频下载
音频下载完成
公众号音频:
入库函数:
[Python] 纯文本查看 复制代码
"""
///公众号历史访问旧接口
[url=https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz=]https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz=[/url]{公众号编号}&f=json&offset={offset}&count=10&is_ok=1&scene=124&uin={访问者编号}&key={key}&pass_ticket={pass_ticket}&wxtoken=&appmsg_token={appmsg_token}&x5=0&f=json
从上面分析能得出获取文章列表的url地址,其中
标红的每次访问时获取的参数,半小时有效
标蓝的为固定,同一公众号固定,同一访问者固定
标粉的为页面数,从0开始,一次10篇;第一页为0,第二页为10,第三页为20,类推
说明:offset里的一篇指的是一次推送,一次推送里可以是一篇文章也可以是多篇文章。
在获取到了一次的链接后,提取出{公众号编号}、{访问者编号}、{key}、{pass_ticket}、{appmsg_token},然后只需要在链接里修改{offset},就能达到读取所有的文章。
还是以 吾爱破解论坛 为例,通过修改offset,把最开始发布的文件找出来。
经过多次测试,确定893为最开始的一篇文章,894就没有文章了
----------------------------------------------------------------------------------------------------------------------------------------
/// 音频获取思路
<mpvoice frameborder="0" class="res_iframe js_editor_audio audio_iframe"
src="/cgi-bin/readtemplate?t=tmpl/audio_tmpl&name=20180704%E8%8A%82%E7%9B%AE%E5%9B%9E%E9%A1%BE2%20%E8%82%9D%E7%99%8C&play_length=29:09"
isaac2="1" low_size="3407.11" source_size="3379.2" high_size="13669.02"
name="20180704节目回顾2 肝癌" play_length="1749000"
voice_encode_fileid="MzA4MzI1NDcyN18yNjUzMTIzNzI1"></mpvoice>
et.HTML(res.text)
voice_encode_fileid = et.xpath("//mpvoice[@class='res_iframe js_editor_audio audio_iframe']/@voice_encode_fileid)[0]
ffmpeg -> [url=https://res.wx.qq.com/voice/getvoice?mediaid=]https://res.wx.qq.com/voice/getvoice?mediaid=[/url]{voice_encode_fileid} # 音频获取
play_length = 1582858952
play_length = time.strftime("%H:%M:%S",time.gmtime(int(play_length))).replace(':','_') # 音频时长
name = name.replace(" ","+") # 视频title
title = name + ' ' + play_length + '.mp3'
-----------------------------------------------------------------------------------------------------------------------------------------
_biz:MzA4MzI1NDcyNw== 2022/6/30 10:02:37
_uin:MTAxNTI0MjI2Mw%3D%3D 2022/6/30 10:02:37
_key:45f73879b4e8a7788f8ed345d876e0698d71edb4c50c6b47f01e33b6844c7259cee75ff66e875b8925eba0a4e70c77e9b124068856fd4c982ef4f0e7959fd079a33a0480ed392d1c72358d88c84aca1ff574e57f4384c6a795432a4e686770ffb088378de347416b1c2713af2a8ae591936d047e8f3712646a0b79078429a91c 2022/6/30 10:02:37
_pass_ticket:i3LKKpAvxyb1I5dx8zKgcrsTpflI51%2B6AcdWLWSBNay8OypDc2rSdwG6ZAXqBDim 2022/6/30 10:02:37
_nickname:郑丹看医生 2022/6/30 10:02:37
_appmsg_token:1172_EbbxVNd0uUtdDKHH7TtooywkKtr24_ZmUa7THR1RNm-Eh0zPirkS2OZJjl5zn5mhFUyb5kqwK7sGQCVX 2022/6/30 10:02:38
"""
import re
import subprocess
import time
from pathlib import Path
from rich import print
import requests
from loguru import logger
from lxml import etree
from sqlite_parse import *
logger.add('wx.log',level='INFO')
def fileNameFiltering(title):
'''
文件名过滤
'''
title.strip()
title = re.sub('[\/:*?"<>|]','-',title)
return title
# 郑丹看医生 offset边界 1629
def obtaining_json_information(public_number,offset,visitor_number,key,pass_ticket,appmsg_token):
"""
获取数据
"""
global headers
base_url = f"https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz={public_number}&f=json&offset={offset}&count=10&is_ok=1&scene=124&uin={visitor_number}&key={key}&pass_ticket={pass_ticket}&wxtoken=&appmsg_token={appmsg_token}&x5=0&f=json"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x6307001e)"
}
jres = requests.get(base_url,headers=headers,timeout=5)
process_the_data(jres.json())
offset_insert_create(table_offset,offset,query=False)
print(f'{offset}页已经加入数据库')
def parameters_to_main():
"""
获取参数并请求
"""
public_number = "*****==" # 固定 biz
visitor_number = "MTAxNT*********D%3D" # 固定 uin
key ="45f738******25eba0a4e70c77e9b124068856fd4c982ef4f0e7959fd079a33a0480ed392d1c72358d88c84aca1ff574e57f4384c6a795432a4e686770ffb088378de347416b1c2713af2a8ae591936d047e8f3712646a0b79078429a91c" # 半小时时效
pass_ticket ="i3LKKpAvxyb1I5dx8zsypDc2rSdwG6Z*******AXqBDim" # 半小时时效
appmsg_token ="1172_EbbxVNd0u*****hFUyb5kqwK7sGQCVX" # 半小时时效
offers = 1629 # 页码
for offset in range(60,offers+1,10):
time.sleep(2)
if not offset_insert_create(table_offset,offset,query=True):
obtaining_json_information(public_number,offset,visitor_number,key,pass_ticket,appmsg_token)
else:
print(f'{offset}页,已经下载完成')
def process_the_data(data):
"""
处理数据
"""
# print(data)
try:
for i in eval(data.get('general_msg_list')).get('list'):
date_time = time.strftime("%Y-%m-%d",time.gmtime(int(i.get('comm_msg_info').get('datetime'))))
if date_time:
insert_data(table_name,date_time,i.get('app_msg_ext_info').get('title'),i.get('app_msg_ext_info').get('content_url'))
if i.get('app_msg_ext_info').get('multi_app_msg_item_list') is not None:
for x in i.get('app_msg_ext_info').get('multi_app_msg_item_list'):
insert_data(table_name,date_time,x['title'],x['content_url'])
except Exception as e:
logger.warning(e)
def main():
'''
入口,逻辑主函数
'''
global table_name,table_offset
table_name = 'zhengdan'
table_offset = "offset"
creat_table(table_name)
parameters_to_main()
if __name__ == "__main__":
main()
[Python] 纯文本查看 复制代码
"""
音频下载
"""
import requests
from pathlib import Path
from rich import print
import requests
from loguru import logger
from lxml import etree
from sqlite_parse import *
import subprocess
import time
logger.add('wx.log', level='INFO')
table_name = 'zhengdan'
def requestDetailsPage(the_article_links, the_article_date, the_article_title):
'''
请求详情地址
'''
global headers
headers = {
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x6307001e)"
}
html = requests.get(the_article_links, headers=headers)
_datas = filterData(the_article_links, html)
if _datas is not None:
video_title, voice_encode_fileid = _datas
processSaveAudio(the_article_date, the_article_title, video_title,
voice_encode_fileid)
def filterData(the_article_links, html):
'''
获取公众号内音频下载信息
'''
try:
et = etree.HTML(html.text)
# rich_pages res_iframe js_editor_audio audio_iframe
# //mpvoice[@class='rich_pages res_iframe js_editor_audio audio_iframe place_audio_area']/
# |//mpvoice[@class='js_editor_audio audio_iframe res_iframe js_uneditable custom_select_card']/
voice_encode_fileid = et.xpath(
"//mpvoice/@voice_encode_fileid"
)[0]
play_length = et.xpath(
"//mpvoice/@play_length"
)[0]
play_length = time.strftime("%H:%M:%S",
time.gmtime(int(play_length))).replace(
':', '_')
name = et.xpath(
"//mpvoice/@name"
)[0]
name = name.replace(" ", "+")
video_title = name + " " + play_length
return video_title, voice_encode_fileid
except Exception as e:
logger.error(f'出问题的网址是:{the_article_links},问题是:{e}')
def processSaveAudio(the_article_date, the_article_title, video_title,
voice_encode_fileid):
'''
下载函数
'''
_dir = Path(__file__).parent.joinpath(the_article_date).joinpath(
the_article_title) # 保存目录
_dir.mkdir(parents=True, exist_ok=True) # 创建目录
_ffmpeg = Path(__file__).parent.joinpath('ffmpeg.exe')
base_url = lambda voice_encode_fileid: f"https://res.wx.qq.com/voice/getvoice?mediaid={voice_encode_fileid}"
cmd = [
str(_ffmpeg), "-y", "-timeout", "10000000", "-user_agent",
headers["User-Agent"], "-i",
base_url(voice_encode_fileid),
"{path}".format(path=str(_dir.joinpath(video_title + '.mp3')))
]
if not _dir.joinpath(video_title + '.mp3').exists():
subprocess.check_call(cmd)
logger.info(
f'网址:{base_url(voice_encode_fileid)}\n存储路径:{_dir.joinpath(video_title + ".mp3")}\n状态:已完成\n'
)
else:
logger.info(
f'网址:{base_url(voice_encode_fileid)}\n存储路径:{_dir.joinpath(video_title + ".mp3")}\n,状态:不能下载\n原因:文件已经下载完成,不能重复下载!!'
)
def main():
datas = filter_by_table_name()
for __datas in datas:
the_article_date = __datas[1]
the_article_title = __datas[2]
the_article_links = __datas[3]
requestDetailsPage(the_article_links, the_article_date,
the_article_title)
if __name__ == "__main__":
main()
操作数据库函数:
[Python] 纯文本查看 复制代码
import re
import sqlite3
from pathlib import Path
BASE_DIR = Path(__file__).parent
def creat_table(table_name):
conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
c = conn.cursor()
c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name}
(ID INTEGER PRIMARY KEY AUTOINCREMENT,DATE_TIME TEXT,TITLE TEXT,CONTENT_URL TEXT);''')
conn.commit()
conn.close()
def insert_data(table_name,DATE_TIME,TITLE,CONTENT_URL):
conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
c = conn.cursor()
cursor = c.execute(f"SELECT DATE_TIME,TITLE,CONTENT_URL from t_{table_name}")
already_have = False
for row in cursor:
if (DATE_TIME,TITLE,CONTENT_URL) in row:
already_have = True
if already_have is False:
c.execute(f"INSERT INTO t_{table_name} (DATE_TIME,TITLE,CONTENT_URL) VALUES ('{DATE_TIME}','{TITLE}','{CONTENT_URL}')")
conn.commit()
conn.close()
def offset_insert_create(table_name,offset,query=False):
conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
c = conn.cursor()
c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name}
(ID INTEGER PRIMARY KEY AUTOINCREMENT,OFFSET TEXT);''')
if query:
cursor = c.execute(f"SELECT offset from t_{table_name}")
if int(offset) in [int(row[0]) for row in cursor]:
return True
else:
c.execute(f"INSERT INTO t_{table_name} (offset) VALUES ('{offset}')")
conn.commit()
conn.close()
return False
def selet_data(table_name):
conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
c = conn.cursor()
cursor = c.execute(f"SELECT DATE_TIME,TITLE,CONTENT_URL from t_{table_name}")
vid_list = [out_exp for out_exp in cursor]
conn.commit()
conn.close()
return vid_list
def filter_by_table_name():
conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
c = conn.cursor()
cursor = c.execute(f"""SELECT * FROM "t_zhengdan" WHERE TITLE LIKE '%肝胆%' ORDER BY "DATE_TIME DES";""")
datas = [row for row in cursor]
conn.commit()
conn.close()
return datas
参考文章:https://www.52pojie.cn/thread-1651719-1-1.html
免费评分
查看全部评分