吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 3066|回复: 5
收起左侧

[Python 原创] 基于Aria2的API接口进行批量下载源码

[复制链接]
禁之零零 发表于 2023-2-16 11:43
本帖最后由 禁之零零 于 2023-2-24 13:56 编辑

[Python] 纯文本查看 复制代码
import requests
import json


class Aria2Download:
    def __init__(self):
        self.api = "http://localhost:6800/jsonrpc"
        # 消息id,aria2会原样返回这个id,可以自动生成也可以用其他唯一标识
        self.id = "QXJpYU5nXzE2NzUxMzUwMDFfMC42Mzc0MDA5MTc2NjAzNDM="

    def addUri(self, url, path, file=None, proxy=None):
        """
        添加任务
        :param url: 文件下载地址
        :param path: 文件保存路径
        :param file: 文件保存名称
        :param proxy: 代{过}{滤}理地址
        :return:
        """
        data = {
            "id": self.id,
            "jsonrpc": "2.0",
            "method": "aria2.addUri",
            "params": [[url], {"dir": path, "out": file, "all-proxy": proxy}]
        }
        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        # print("addUri", return_json)
        return return_json

    def getGlobalStat(self):
        """
        获取全部下载信息
        :return:
        """
        data = {
            "jsonrpc": "2.0",
            "method": "aria2.getGlobalStat",
            "id": self.id
        }
        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        # print("getGlobalStat", return_json)
        return return_json

    def tellActive(self):
        """
        正在下载
        :return:
        """
        data = {
            "jsonrpc": "2.0",
            "method": "aria2.tellActive",
            "id": self.id, "params": [
                ["gid", "totalLength", "completedLength", "uploadSpeed", "downloadSpeed", "connections", "numSeeders",
                 "seeder", "status", "errorCode", "verifiedLength", "verifyIntegrityPending", "files", "bittorrent",
                 "infoHash"]]
        }

        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        # print("getGlobalStat", return_json)
        return return_json

    def tellWaiting(self):
        """
        正在等待
        :return:
        """
        data = {"jsonrpc": "2.0", "method": "aria2.tellWaiting",
                "id": self.id,
                "params": [0, 1000, ["gid", "totalLength",
                                     "completedLength",
                                     "uploadSpeed",
                                     "downloadSpeed",
                                     "connections",
                                     "numSeeders",
                                     "seeder", "status",
                                     "errorCode",
                                     "verifiedLength",
                                     "verifyIntegrityPending"]
                           ]
                }
        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        print("tellWaiting", return_json)
        return return_json

    def tellStopped(self):
        """
        已完成/已停止
        :return:
        """
        data = {"jsonrpc": "2.0",
                "method": "aria2.tellStopped",
                "id": self.id,
                "params": [-1, 1000, ["gid", "totalLength",
                                      "completedLength",
                                      "uploadSpeed",
                                      "downloadSpeed",
                                      "connections",
                                      "numSeeders", "seeder",
                                      "status", "errorCode",
                                      "verifiedLength",
                                      "verifyIntegrityPending"]]
                }
        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        # print("tellStopped", return_json)
        return return_json

    def tellStatus(self, gid):
        """
        任务状态
        :param gid: 任务ID
        :return:
        """
        data = {"jsonrpc": "2.0", "method": "aria2.tellStatus", "id": self.id, "params": [gid]}
        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        # print("tellWaiting", return_json)
        return return_json

    def removeDownloadResult(self, gid):
        """
        删除下载结束的任务
        :param gid: 任务ID
        :return:
        """
        data = {"jsonrpc": "2.0", "method": "aria2.removeDownloadResult", "id": self.id, "params": [gid]}
        req = requests.post(url=self.api, data=json.dumps(data))
        return_json = req.json()
        req.close()
        # print("removeDownloadResult", return_json)
        return return_json

免费评分

参与人数 2吾爱币 +1 热心值 +1 收起 理由
remlong77 + 1 我很赞同!
anderdawYang + 1 我很赞同!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

motto 发表于 2023-2-16 13:42
赞一个,回来看看
cxincn 发表于 2023-2-18 20:47
kmm996 发表于 2023-3-2 19:53
欧阳栓柱 发表于 2023-7-13 22:22
赞赞赞,终于找到aria2的接口了
hymnmx 发表于 2023-7-15 00:50
看看试试  感谢分享
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-11 10:15

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表