吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1937|回复: 49
上一主题 下一主题
收起左侧

[Python 原创] M3U8简单下载器 - 源码

  [复制链接]
跳转到指定楼层
楼主
枫雪 发表于 2025-3-10 09:47 回帖奖励
M3U8简单下载器 - 源码

[Python] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import sys
import os
from urllib.parse import urlparse
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit,
                             QPushButton, QTextEdit,
                             QFileDialog, QMessageBox)
from PyQt5.QtCore import QProcess, QByteArray
 
 
class M3U8Downloader(QMainWindow):
    def __init__(self):
        super().__init__()
        self.log_text = QTextEdit()
        self.download_btn = QPushButton('开始下载')
        self.browse_btn = QPushButton('浏览...')
        self.path_input = QLineEdit()
        self.filename_input = QLineEdit()
        self.url_input = QLineEdit()
        self.current_dir = os.path.dirname(os.path.abspath(__file__))
        self.initUI()
        self.process = QProcess(self)
        self.process.readyReadStandardOutput.connect(self.handle_stdout)
        self.process.readyReadStandardError.connect(self.handle_stderr)
        self.process.finished.connect(self.process_finished)
 
    def initUI(self):
        self.setWindowTitle('M3U8视频下载器')
        self.setGeometry(100, 100, 800, 600)
 
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
 
        main_layout = QVBoxLayout()
 
        url_layout = QHBoxLayout()
        self.url_input.setPlaceholderText('输入M3U8视频链接...')
        self.url_input.textChanged.connect(self.update_filename)
        url_layout.addWidget(QLabel('视频链接:'))
        url_layout.addWidget(self.url_input)
 
        filename_layout = QHBoxLayout()
        self.filename_input.setPlaceholderText('自动生成文件名')
        filename_layout.addWidget(QLabel('保存名称:'))
        filename_layout.addWidget(self.filename_input)
 
        path_layout = QHBoxLayout()
        default_download_dir = os.path.join(self.current_dir, 'download')
        os.makedirs(default_download_dir, exist_ok=True)
        self.path_input.setText(default_download_dir)
        self.browse_btn.clicked.connect(self.select_output_dir)
        path_layout.addWidget(QLabel('保存路径:'))
        path_layout.addWidget(self.path_input)
        path_layout.addWidget(self.browse_btn)
 
        self.download_btn.clicked.connect(self.start_download)
 
        self.log_text.setReadOnly(True)
 
        main_layout.addLayout(url_layout)
        main_layout.addLayout(filename_layout)
        main_layout.addLayout(path_layout)
        main_layout.addWidget(self.download_btn)
        main_layout.addWidget(QLabel('下载日志:'))
        main_layout.addWidget(self.log_text)
        central_widget.setLayout(main_layout)
 
    def update_filename(self):
        url = self.url_input.text().strip()
        if not url:
            self.filename_input.setText('')
            return
 
        parsed = urlparse(url)
        path = parsed.path
        base_name = os.path.basename(path)
        if '.' in base_name:
            name = os.path.splitext(base_name)[0]
        else:
            name = 'output'
 
        self.filename_input.setText(f"{name}.mp4")
 
    def select_output_dir(self):
        directory = QFileDialog.getExistingDirectory(self, '选择保存路径')
        if directory:
            self.path_input.setText(directory)
 
    def start_download(self):
        url = self.url_input.text().strip()
        save_dir = self.path_input.text().strip()
        filename = self.filename_input.text().strip()
 
        if not url:
            QMessageBox.warning(self, '错误', '请输入有效的M3U8链接')
            return
 
        if not os.path.exists(save_dir):
            QMessageBox.warning(self, '错误', '保存路径不存在')
            return
 
        if not filename.lower().endswith('.mp4'):
            filename += '.mp4'
 
        # 构建命令参数
        args = [
            '--save-name', f'{filename[:-4]}',
            '--tmp-dir', f'{os.path.join(save_dir, 'temp')}',
            '--save-dir', f'{save_dir}',
            '--ffmpeg-binary-path', f'{os.path.join(self.current_dir, "bin", "ffmpeg.exe")}',
            '--del-after-done',
            '--force-ansi-console',
            '--auto-select',
            # '--custom-proxy',f'http://127.0.0.1:7890',
            f'{url}'
        ]
        # 测试视频地址
        # https://sf1-cdn-tos.huoshanstatic.com/obj/media-fe/xgplayer_doc_video/hls/xgplayer-demo.m3u8
        # https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8
 
        self.log_text.clear()
        self.log(f'开始下载:{url}')
        self.log(f'保存路径:{os.path.join(save_dir, filename)}')
        self.download_btn.setEnabled(False)
 
        # 启动下载进程
        executable_path = os.path.join(self.current_dir, 'bin', 'N_m3u8DL-RE.exe')
        self.process.start(executable_path, args)
 
    def handle_stdout(self):
        data = self.process.readAllStandardOutput()
        self.log(QByteArray(data).data().decode('utf-8', errors='ignore'))
 
    def handle_stderr(self):
        data = self.process.readAllStandardError()
        self.log(QByteArray(data).data().decode('utf-8', errors='ignore'), error=True)
 
    def process_finished(self, exit_code):
        self.download_btn.setEnabled(True)
        if exit_code == 0:
            self.log(f'下载完成!文件保存为:{self.filename_input.text()}')
        else:
            self.log(f'下载失败,错误码:{exit_code}')
 
    def log(self, message, error=False):
        color = 'red' if error else 'black'
        self.log_text.append(f'<span style="color:{color}">{message}</span>')
 
 
if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = M3U8Downloader()
    screen_resolution = app.desktop().screenGeometry()
    window_resolution = window.geometry()
    left = (screen_resolution.width() - window_resolution.width()) / 2
    top = (screen_resolution.height() - window_resolution.height()) / 2
    window.move(int(left), int(top))
    window.show()
    sys.exit(app.exec_())


下载地址:https://fbio.lanzout.com/i6EZD2q5kd3a

免费评分

参与人数 6吾爱币 +12 热心值 +6 收起 理由
xhao43 + 1 + 1 谢谢@Thanks!
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
af8889 + 1 + 1 能做成水果版的吗?
木林森森林木 + 1 + 1 热心回复!
gunxsword + 1 + 1 热心回复!
xyz2000cn007 + 1 + 1 热心回复!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

推荐
shaunkelly 发表于 2025-3-10 12:48

import sys
import os
import re
import time
from datetime import datetime
from urllib.parse import urlparse, unquote
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QLineEdit, QPushButton, QTextEdit, QFileDialog,
QMessageBox, QCheckBox, QProgressBar)
from PyQt5.QtCore import QProcess, Qt

class M3U8Downloader(QMainWindow):
def init(self):
super().init()

初始化组件

    self.process = QProcess(self)
    self.current_dir = os.path.dirname(os.path.abspath(__file__))
    self.log_file = open('download.log', 'a', encoding='utf-8')
    self.init_ui()
    self.init_process()

def init_ui(self):
    """初始化用户界面"""
    self.setWindowTitle('M3U8视频下载器 PRO')
    self.setGeometry(200, 200, 1000, 800)

    # 主控件
    central_widget = QWidget()
    self.setCentralWidget(central_widget)
    main_layout = QVBoxLayout(central_widget)

    # 输入组件
    self.url_input = QLineEdit(placeholderText='输入M3U8链接(支持http/https)')
    self.url_input.textChanged.connect(self.auto_generate_filename)
    self.filename_input = QLineEdit(placeholderText='自动生成文件名')
    self.path_input = QLineEdit()
    self.path_input.setText(os.path.normpath(os.path.join(self.current_dir, 'downloads')))

    # 代理设置
    self.proxy_check = QCheckBox("使用代理")
    self.proxy_input = QLineEdit(placeholderText='127.0.0.1:7890')

    # 功能按钮
    self.browse_btn = QPushButton('选择路径', clicked=self.select_output_dir)
    self.download_btn = QPushButton('开始下载', clicked=self.start_download)
    self.download_btn.setStyleSheet("background-color: #4CAF50; color: white;")

    # 进度条
    self.progress_bar = QProgressBar()
    self.progress_bar.setAlignment(Qt.AlignCenter)

    # 日志显示
    self.log_text = QTextEdit(readOnly=True)
    self.log_text.setStyleSheet("font-family: Consolas; font-size: 10pt;")

    # 布局管理
    input_layout = self.create_input_layout()
    proxy_layout = self.create_proxy_layout()

    main_layout.addLayout(input_layout)
    main_layout.addLayout(proxy_layout)
    main_layout.addWidget(self.progress_bar)
    main_layout.addWidget(self.download_btn)
    main_layout.addWidget(QLabel('下载日志:'))
    main_layout.addWidget(self.log_text)

def init_process(self):
    """初始化进程信号连接"""
    self.process.readyReadStandardOutput.connect(self.handle_stdout)
    self.process.readyReadStandardError.connect(self.handle_stderr)
    self.process.finished.connect(self.on_process_finished)
    self.process.errorOccurred.connect(self.handle_process_error)

def create_input_layout(self):
    """创建输入区域布局"""
    layout = QVBoxLayout()

    url_layout = QHBoxLayout()
    url_layout.addWidget(QLabel('视频链接:'))
    url_layout.addWidget(self.url_input)

    file_layout = QHBoxLayout()
    file_layout.addWidget(QLabel('保存名称:'))
    file_layout.addWidget(self.filename_input)

    path_layout = QHBoxLayout()
    path_layout.addWidget(QLabel('保存路径:'))
    path_layout.addWidget(self.path_input)
    path_layout.addWidget(self.browse_btn)

    layout.addLayout(url_layout)
    layout.addLayout(file_layout)
    layout.addLayout(path_layout)
    return layout

def create_proxy_layout(self):
    """创建代理设置布局"""
    layout = QHBoxLayout()
    layout.addWidget(self.proxy_check)
    layout.addWidget(self.proxy_input)
    layout.addStretch()
    return layout

def auto_generate_filename(self):
    """智能生成文件名"""
    url = self.url_input.text().strip()
    if not url:
        return

    try:
        parsed = urlparse(unquote(url))  # URL解码处理
        base_name = os.path.basename(parsed.path).split('?')[0]  # 去除查询参数
        if not base_name or '.' not in base_name:
            domain_part = parsed.netloc.split('.')[-2] if '.' in parsed.netloc else "video"
            base_name = f"{domain_part}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        self.filename_input.setText(base_name)
    except Exception as e:
        self.log_message('WARN', f'自动命名失败: {str(e)}')
        self.filename_input.setText(f"video_{int(time.time())}")

def validate_inputs(self):
    """输入验证"""
    if not self.url_input.text().startswith(('http://', 'https://')):
        QMessageBox.warning(self, '警告', '请输入有效的HTTP(S)链接')
        return False

    if not os.access(self.path_input.text(), os.W_OK):
        QMessageBox.critical(self, '错误', '目标路径不可写,请选择其他目录')
        return False

    return True

def build_ffmpeg_command(self):
    """构建FFmpeg命令"""
    output_path = os.path.join(
        self.path_input.text(),
        f"{self.filename_input.text()}.mp4"
    )
    return [
        'ffmpeg',
        '-i', self.url_input.text(),
        '-c', 'copy',
        '-y',  # 覆盖已存在文件
        output_path
    ]

def start_download(self):
    """启动下载进程"""
    if not self.validate_inputs():
        return

    try:
        # 确保输出目录存在
        os.makedirs(self.path_input.text(), exist_ok=True)

        # 配置进程参数
        self.process.setProgram('ffmpeg')
        self.process.setArguments(self.build_ffmpeg_command()[1:])  # 跳过程序名

        # 设置代理环境变量(如果需要)
        if self.proxy_check.isChecked():
            proxy = self.proxy_input.text()
            env = self.process.processEnvironment()
            env.insert('HTTP_PROXY', proxy)
            env.insert('HTTPS_PROXY', proxy)
            self.process.setProcessEnvironment(env)

        self.download_btn.setEnabled(False)
        self.process.start()
        self.log_message('INFO', '下载进程已启动...')
    except Exception as e:
        self.log_message('ERROR', f'进程启动失败: {str(e)}')
        QMessageBox.critical(self, '错误', '无法启动下载进程')

def handle_stdout(self):
    """处理标准输出"""
    data = self.process.readAllStandardOutput().data().decode()
    self.log_message('INFO', data)

    # 解析进度信息
    if match := re.search(r'time=(\d+:\d+:\d+\.\d+)', data):
        self.log_text.append(f"▶ 当前进度: {match.group(1)}")

def handle_stderr(self):
    """处理错误输出"""
    data = self.process.readAllStandardError().data().decode()
    self.log_message('ERROR', data)

def handle_process_error(self, error):
    """处理进程错误"""
    self.log_message('CRITICAL', f'进程异常: {error.name}')
    QMessageBox.critical(self, '错误', f'进程错误: {error.name}')

def on_process_finished(self, exit_code, exit_status):
    """进程结束处理"""
    self.download_btn.setEnabled(True)
    if exit_code == 0:
        self.log_message('SUCCESS', '下载完成!')
        QMessageBox.information(self, '完成', '视频下载成功')
    else:
        self.log_message('ERROR', f'进程异常退出,代码: {exit_code}')

def select_output_dir(self):
    """选择保存目录"""
    path = QFileDialog.getExistingDirectory(
        self,
        "选择保存目录",
        self.path_input.text(),
        QFileDialog.ShowDirsOnly
    )
    if path:
        self.path_input.setText(os.path.normpath(path))

def log_message(self, level, message):
    """分级日志记录"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    log_entry = f"[{timestamp}] [{level}] {message}"

    # 控制台颜色显示
    if level == 'ERROR':
        formatted = f'<span style="color: red;">{log_entry}</span>'
    elif level == 'SUCCESS':
        formatted = f'<span style="color: green;">{log_entry}</span>'
    else:
        formatted = log_entry

    self.log_text.append(formatted)
    self.log_file.write(log_entry + '\n')
    self.log_file.flush()

def closeEvent(self, event):
    """窗口关闭时清理资源"""
    self.log_file.close()
    if self.process.state() == QProcess.Running:
        self.process.terminate()
    event.accept()

if name == 'main':
app = QApplication(sys.argv)
window = M3U8Downloader()
window.show()
sys.exit(app.exec_())




主要改进说明:
模块化设计:
使用init_ui和init_process分离界面和逻辑初始化
通过create_input_layout和create_proxy_layout实现布局组件化
独立的验证方法validate_inputs
增强功能:
智能文件名生成(支持URL解码和时间戳)
代理服务器支持(HTTP/HTTPS代理)
进度信息解析(正则匹配时间戳)
多级日志系统(控制台颜色+文件记录)
健壮性提升:
进程错误处理(errorOccurred信号绑定)
路径标准化处理(os.path.normpath)
资源清理(关闭日志文件、终止进程)
用户体验优化:
实时进度显示
带颜色的日志分级
友好的错误提示对话框
禁用按钮防止重复操作
沙发
ouran2006 发表于 2025-3-10 10:10
3#
winwoo 发表于 2025-3-10 10:14
4#
q8488761 发表于 2025-3-10 10:24
感谢分享!已下载!
5#
fy9631 发表于 2025-3-10 10:34
收藏了,感谢楼主
6#
linsixi 发表于 2025-3-10 10:54
不错,一直在找这种模式的源码。。简单好用。
7#
jubaicc 发表于 2025-3-10 10:58

收藏,感谢分享
8#
天天涨停天天盈 发表于 2025-3-10 11:03
蹲一个使用评价~
9#
Corgibro 发表于 2025-3-10 11:38
哇,这个可以的,很有用
10#
tianyagk1314 发表于 2025-3-10 11:40
下载了  ,感谢分享
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-4-20 21:02

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表