话痨司机啊 发表于 2022-6-29 10:59

【源码】抓PC版微信公众号历史记录以及文章内涵音频下载【20220630更新】

本帖最后由 话痨司机啊 于 2022-7-5 19:36 编辑

本身就是学医的,对健康方面的公众号关注较多,工作上可以使用到一些资料也来源于公众号,偶然机会发现有大佬发布了关于下载公众号历史文章的帖子,按着大佬思路,写了一个python版本的,主要用于下载公众号内的音频,希望对于有这方面需求的小伙伴一些帮助。


顺便说一下用到的分析软件:fiddler、SQLiteManager、postman、Apifox

把公众号“某中医药”数据导出来一份,里面有一些处方,希望帮助一些有慢性病的人,网址直接复制就可以访问,excel格式。


效果展示:
正在爬取,总计1629页


数据库结构


数据清洗后,提取有用的链接进行音频下载

音频下载完成



公众号音频:




入库函数:

"""
///公众号历史访问旧接口

https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz={公众号编号}&f=json&offset={offset}&count=10&is_ok=1&scene=124&uin={访问者编号}&key={key}&pass_ticket={pass_ticket}&wxtoken=&appmsg_token={appmsg_token}&x5=0&f=json
从上面分析能得出获取文章列表的url地址,其中
标红的每次访问时获取的参数,半小时有效
标蓝的为固定,同一公众号固定,同一访问者固定
标粉的为页面数,从0开始,一次10篇;第一页为0,第二页为10,第三页为20,类推

说明:offset里的一篇指的是一次推送,一次推送里可以是一篇文章也可以是多篇文章。

在获取到了一次的链接后,提取出{公众号编号}、{访问者编号}、{key}、{pass_ticket}、{appmsg_token},然后只需要在链接里修改{offset},就能达到读取所有的文章。
还是以 吾爱破解论坛 为例,通过修改offset,把最开始发布的文件找出来。
经过多次测试,确定893为最开始的一篇文章,894就没有文章了      

----------------------------------------------------------------------------------------------------------------------------------------
/// 音频获取思路

<mpvoice frameborder="0" class="res_iframe js_editor_audio audio_iframe"
                                                                              src="/cgi-bin/readtemplate?t=tmpl/audio_tmpl&name=20180704%E8%8A%82%E7%9B%AE%E5%9B%9E%E9%A1%BE2%20%E8%82%9D%E7%99%8C&play_length=29:09"
                                                                              isaac2="1" low_size="3407.11" source_size="3379.2" high_size="13669.02"
                                                                              name="20180704节目回顾2 肝癌" play_length="1749000"
                                                                              voice_encode_fileid="MzA4MzI1NDcyN18yNjUzMTIzNzI1"></mpvoice>
et.HTML(res.text)
voice_encode_fileid = et.xpath("//mpvoice[@class='res_iframe js_editor_audio audio_iframe']/@voice_encode_fileid)         
ffmpeg -> https://res.wx.qq.com/voice/getvoice?mediaid={voice_encode_fileid} # 音频获取
play_length = 1582858952
play_length = time.strftime("%H:%M:%S",time.gmtime(int(play_length))).replace(':','_') # 音频时长
name = name.replace(" ","+") # 视频title
title = name + ' ' + play_length + '.mp3'
-----------------------------------------------------------------------------------------------------------------------------------------
_biz:MzA4MzI1NDcyNw== 2022/6/30 10:02:37
_uin:MTAxNTI0MjI2Mw%3D%3D 2022/6/30 10:02:37
_key:45f73879b4e8a7788f8ed345d876e0698d71edb4c50c6b47f01e33b6844c7259cee75ff66e875b8925eba0a4e70c77e9b124068856fd4c982ef4f0e7959fd079a33a0480ed392d1c72358d88c84aca1ff574e57f4384c6a795432a4e686770ffb088378de347416b1c2713af2a8ae591936d047e8f3712646a0b79078429a91c 2022/6/30 10:02:37
_pass_ticket:i3LKKpAvxyb1I5dx8zKgcrsTpflI51%2B6AcdWLWSBNay8OypDc2rSdwG6ZAXqBDim 2022/6/30 10:02:37
_nickname:郑丹看医生 2022/6/30 10:02:37
_appmsg_token:1172_EbbxVNd0uUtdDKHH7TtooywkKtr24_ZmUa7THR1RNm-Eh0zPirkS2OZJjl5zn5mhFUyb5kqwK7sGQCVX 2022/6/30 10:02:38
"""

import re
import subprocess
import time
from pathlib import Path
from rich import print
import requests
from loguru import logger
from lxml import etree
from sqlite_parse import *
logger.add('wx.log',level='INFO')


def fileNameFiltering(title):
    '''
    文件名过滤
    '''
    title.strip()
    title = re.sub('[\/:*?"<>|]','-',title)
    return title
   
# 郑丹看医生 offset边界 1629
def obtaining_json_information(public_number,offset,visitor_number,key,pass_ticket,appmsg_token):
    """
    获取数据
    """
    global headers
    base_url = f"https://mp.weixin.qq.com/mp/profile_ext?action=getmsg&__biz={public_number}&f=json&offset={offset}&count=10&is_ok=1&scene=124&uin={visitor_number}&key={key}&pass_ticket={pass_ticket}&wxtoken=&appmsg_token={appmsg_token}&x5=0&f=json"
    headers = {
      "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x6307001e)"
    }
    jres = requests.get(base_url,headers=headers,timeout=5)
    process_the_data(jres.json())
    offset_insert_create(table_offset,offset,query=False)
    print(f'{offset}页已经加入数据库')
   
def parameters_to_main():
    """
    获取参数并请求
    """
    public_number = "*****==" # 固定 biz
    visitor_number = "MTAxNT*********D%3D" # 固定 uin
    key ="45f738******25eba0a4e70c77e9b124068856fd4c982ef4f0e7959fd079a33a0480ed392d1c72358d88c84aca1ff574e57f4384c6a795432a4e686770ffb088378de347416b1c2713af2a8ae591936d047e8f3712646a0b79078429a91c" # 半小时时效
    pass_ticket ="i3LKKpAvxyb1I5dx8zsypDc2rSdwG6Z*******AXqBDim" # 半小时时效
    appmsg_token ="1172_EbbxVNd0u*****hFUyb5kqwK7sGQCVX" # 半小时时效
    offers = 1629 # 页码
    for offset in range(60,offers+1,10):
      time.sleep(2)
      if not offset_insert_create(table_offset,offset,query=True):
            obtaining_json_information(public_number,offset,visitor_number,key,pass_ticket,appmsg_token)
      else:
            print(f'{offset}页,已经下载完成')
            
def process_the_data(data):
    """
    处理数据
    """
    # print(data)
    try:
      for i in eval(data.get('general_msg_list')).get('list'):
            date_time = time.strftime("%Y-%m-%d",time.gmtime(int(i.get('comm_msg_info').get('datetime'))))
            if date_time:
                insert_data(table_name,date_time,i.get('app_msg_ext_info').get('title'),i.get('app_msg_ext_info').get('content_url'))
                if i.get('app_msg_ext_info').get('multi_app_msg_item_list') is not None:
                        for x in i.get('app_msg_ext_info').get('multi_app_msg_item_list'):
                            insert_data(table_name,date_time,x['title'],x['content_url'])
    except Exception as e:
            logger.warning(e)

def main():
    '''
    入口,逻辑主函数
    '''

    global table_name,table_offset
    table_name = 'zhengdan'
    table_offset = "offset"
    creat_table(table_name)
    parameters_to_main()
   
if __name__ == "__main__":
    main()


"""
音频下载

"""

import requests
from pathlib import Path
from rich import print
import requests
from loguru import logger
from lxml import etree
from sqlite_parse import *
import subprocess
import time

logger.add('wx.log', level='INFO')
table_name = 'zhengdan'


def requestDetailsPage(the_article_links, the_article_date, the_article_title):
    '''
    请求详情地址
    '''
    global headers
    headers = {
      "User-Agent":
      "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x6307001e)"
    }
    html = requests.get(the_article_links, headers=headers)
    _datas = filterData(the_article_links, html)
    if _datas is not None:
      video_title, voice_encode_fileid = _datas
      processSaveAudio(the_article_date, the_article_title, video_title,
                         voice_encode_fileid)


def filterData(the_article_links, html):
    '''
    获取公众号内音频下载信息
    '''
    try:
      et = etree.HTML(html.text)
      # rich_pages res_iframe js_editor_audio audio_iframe
      # //mpvoice[@class='rich_pages res_iframe js_editor_audio audio_iframe place_audio_area']/
      # |//mpvoice[@class='js_editor_audio audio_iframe res_iframe js_uneditable custom_select_card']/
      voice_encode_fileid = et.xpath(
            "//mpvoice/@voice_encode_fileid"
      )
      play_length = et.xpath(
            "//mpvoice/@play_length"
      )
      play_length = time.strftime("%H:%M:%S",
                                    time.gmtime(int(play_length))).replace(
                                        ':', '_')
      name = et.xpath(
            "//mpvoice/@name"
      )
      name = name.replace(" ", "+")
      video_title = name + " " + play_length
      return video_title, voice_encode_fileid
    except Exception as e:
      logger.error(f'出问题的网址是:{the_article_links},问题是:{e}')


def processSaveAudio(the_article_date, the_article_title, video_title,
                     voice_encode_fileid):
    '''
    下载函数
    '''
    _dir = Path(__file__).parent.joinpath(the_article_date).joinpath(
      the_article_title)# 保存目录
    _dir.mkdir(parents=True, exist_ok=True)# 创建目录
    _ffmpeg = Path(__file__).parent.joinpath('ffmpeg.exe')
    base_url = lambda voice_encode_fileid: f"https://res.wx.qq.com/voice/getvoice?mediaid={voice_encode_fileid}"
    cmd = [
      str(_ffmpeg), "-y", "-timeout", "10000000", "-user_agent",
      headers["User-Agent"], "-i",
      base_url(voice_encode_fileid),
      "{path}".format(path=str(_dir.joinpath(video_title + '.mp3')))
    ]
    if not _dir.joinpath(video_title + '.mp3').exists():
      subprocess.check_call(cmd)
      logger.info(
            f'网址:{base_url(voice_encode_fileid)}\n存储路径:{_dir.joinpath(video_title + ".mp3")}\n状态:已完成\n'
      )
    else:
         logger.info(
            f'网址:{base_url(voice_encode_fileid)}\n存储路径:{_dir.joinpath(video_title + ".mp3")}\n,状态:不能下载\n原因:文件已经下载完成,不能重复下载!!'
      )


def main():
    datas = filter_by_table_name()
    for __datas in datas:
      the_article_date = __datas
      the_article_title = __datas
      the_article_links = __datas
      requestDetailsPage(the_article_links, the_article_date,
                           the_article_title)


if __name__ == "__main__":
    main()

操作数据库函数:
import re
import sqlite3
from pathlib import Path

BASE_DIR = Path(__file__).parent

def creat_table(table_name):
    conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
    c = conn.cursor()
    c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name}
         (ID INTEGER PRIMARY KEY AUTOINCREMENT,DATE_TIME TEXT,TITLE TEXT,CONTENT_URL TEXT);''')
    conn.commit()
    conn.close()


def insert_data(table_name,DATE_TIME,TITLE,CONTENT_URL):
    conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
    c = conn.cursor()
    cursor = c.execute(f"SELECT DATE_TIME,TITLE,CONTENT_URL from t_{table_name}")
    already_have = False
    for row in cursor:
      if (DATE_TIME,TITLE,CONTENT_URL) in row:
            already_have = True
    if already_have is False:
      c.execute(f"INSERT INTO t_{table_name} (DATE_TIME,TITLE,CONTENT_URL) VALUES ('{DATE_TIME}','{TITLE}','{CONTENT_URL}')")
    conn.commit()
    conn.close()
   
def offset_insert_create(table_name,offset,query=False):
    conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
    c = conn.cursor()
    c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name}
         (ID INTEGER PRIMARY KEY AUTOINCREMENT,OFFSET TEXT);''')
    if query:
      cursor = c.execute(f"SELECT offset from t_{table_name}")
      if int(offset) in ) for row in cursor]:
            return True
    else:
      c.execute(f"INSERT INTO t_{table_name} (offset) VALUES ('{offset}')")
    conn.commit()
    conn.close()
    return False

   
   

def selet_data(table_name):
    conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
    c = conn.cursor()
    cursor = c.execute(f"SELECT DATE_TIME,TITLE,CONTENT_URL from t_{table_name}")
    vid_list =
    conn.commit()
    conn.close()
    return vid_list

def filter_by_table_name():
    conn = sqlite3.connect(BASE_DIR.joinpath('wxgz.db'))
    c = conn.cursor()
    cursor = c.execute(f"""SELECT * FROM "t_zhengdan" WHERE TITLE LIKE'%肝胆%' ORDER BY "DATE_TIME DES";""")
    datas =
    conn.commit()
    conn.close()
    return datas

参考文章:https://www.52pojie.cn/thread-1651719-1-1.html

话痨司机啊 发表于 2023-5-11 01:25

本帖最后由 话痨司机啊 于 2023-5-11 01:26 编辑

wowo9966 发表于 2023-5-10 22:23
我也是非常的需要,把这个文章采下来,然后pdf或者其他格式保存下
https://www.52pojie.cn/thread-1765143-1-1.html


如何将保存的微信文章连同评论优雅地转为电子书 - 『编程语言区』 - 吾爱破解 - LCG - LSG |安卓破解|病毒分析|www.52pojie.cn

话痨司机啊 发表于 2022-8-19 22:16

lanyi10000 发表于 2022-8-19 21:43
大佬发布了关于下载公众号历史文章的帖子

同学医的,请问楼主说的大佬帖子是?

https://www.52pojie.cn/thread-1651719-1-1.html

longdoushi 发表于 2022-6-29 11:35

好东西,学习下,感谢

尹铭 发表于 2022-6-29 11:35

不明觉厉~!

xiaoshu1688 发表于 2022-6-29 11:41

感谢分享。不错。

亿联网络 发表于 2022-6-29 11:45

医学生里面编程最牛的{:1_921:}

黑羽快斗 发表于 2022-6-29 12:28

谢谢lz的提供

的士不载人 发表于 2022-6-29 12:38

内涵音频是啥意思

话痨司机啊 发表于 2022-6-29 12:56

的士不载人 发表于 2022-6-29 12:38
内涵音频是啥意思

就是公众号文章里有音频

NMG69 发表于 2022-6-29 14:30

感谢大佬分享

mokson 发表于 2022-6-29 14:44

页: [1] 2 3 4 5
查看完整版本: 【源码】抓PC版微信公众号历史记录以及文章内涵音频下载【20220630更新】