多些如果 发表于 2021-6-4 10:16

关于python 多线程/多进程 下载多个文件

import os
import re
import concurrent.futures
import datetime
import json
import subprocess
import logging
import re
from multiprocessing import Process
###定义类###
class APK:
    def __init__(self, name, url, dir_path, file_path):
      self.__name = name
      self.__url = url
      self.__dir_path = dir_path
      self.__file_path = file_path
    def get_name(self):
      return self.__name
    def get_url(self):
      return self.__url
   
    def get_dir_path(self):
      return self.__dir_path
   
    def get_file_path(self):
      return self.__file_path
    ###下载模块###
def download_apk(apk):
    try:
      if 'make.file' in apk.get_url():
            cmd = ["wget", token, "-O Android.mk", apk.get_url()]
      else:
            cmd = ["wget", token, apk.get_url()]
      
      sp_output = None
      if os.path.exists(apk.get_dir_path()):
            os.chdir(apk.get_dir_path())
            for i in ["app-release.apk", "Android.mk", "app-debug.apk"]:
                if os.path.exists(i):
                  os.remove(i)
            logging.info("%s start download ", apk.get_file_path())
            logging.info("Download url is %s ",apk.get_url())
            sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
            logging.info("%s download successfully", apk.get_file_path())
      else:
            os.makedirs(apk.get_dir_path())
            os.chdir(apk.get_dir_path())
            logging.info("%s start download ", apk.get_file_path())
            logging.info("Download url is %s ",apk.get_url())
            sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
            logging.info("%s download successfully", apk.get_file_path())
    except subprocess.CalledProcessError as e:
      logging.error(e)
      print(sp_output)
      print(e.output)
      raise e
if __name__=='__main__':
    log_file = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    logging.basicConfig(level=logging.INFO, filename=log_file, filemode='a', format='%(asctime)s : %(message)s')
    url_prefix = 'http://ip:port/dir/apps'
    down_dir = '/home/xxx/xxx'
    token = "--user=xxx --password=xxx"
    ###处理字符串,返回下载列表###
    file_list = []
    with open('result.json','r') as f:
      for number,line in enumerate(f,start=1):
            if "V_Build" in line:
                continue
            
            if 'make.file' in line or 'apk' in line:
                line_formated = line.split(':')[-1].replace('"','').replace(',','').replace('\n','').replace(' ','')
                res = re.search(r"\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}", line_formated)
                if not res:
                  file_list.append(line_formated)
    print(file_list)
    print(len(file_list))
    ###获取类的集合###
      
    down_list = []
    for name in file_list:
      dir_path = '/'.join(name.split('/')[:-1])
      abs_dir_path = down_dir + dir_path
      abs_file_path = abs_dir_path + name.split('/')[-1]
      url = url_prefix + name
      apk = APK(name, url, abs_dir_path, abs_file_path)
      down_list.append(apk)
    ###线程/进程集合###   
    task = []
    # for apk in down_list:
    #   p = Process(target=download_apk,args=(apk,))
    #   p.start()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
      for apk in down_list:
            task.append(e.submit(download_apk, apk))
    ###读取返回值###   
    for i in task:
      if i:
            print(i)

本人编程小白,请教各位大佬,以下几个问题:
1、在线程/进程集合 部分,在运行脚本的时候,发现用进程下载文件 耗时比用线程池少且下载完全;而用线程池的下载文件的时候,发现文件下载不全,例如要下载30个文件,实际上只下载了20,这是为啥?

2、本人了解到,shell命令curl是采用多线程下载文件,在此代码中如果用curl 会有什么不妥嘛?不管多进程还是多线程。

wzh123456789 发表于 2021-6-4 11:28

因为python有GIL锁 你换个python解释器比如jpython就行了

Magicy 发表于 2021-6-4 12:39

好像是有个全局锁的问题?

thepoy 发表于 2021-6-4 12:51

对楼上的看法我不是很同意,GIL锁对 IO 密集型操作没有影响,和多进程效率差不多。在 CPU 密集型操作下,多进程和多线程的效率差距才会明显。
爬虫或下载属于 IO 密集型操作,差距不会明显,除非你的代码有问题。
至于为什么多线程没有下载完 30 个,是因为你 task.append 添加了任务,但没有等待任务完成。想要完成所有线程,需要在主线程阻塞等待所有线程。

benty 发表于 2021-6-4 13:14

用python进程池from multiprocessing import Pool

多些如果 发表于 2021-6-7 11:34

thepoy 发表于 2021-6-4 12:51
对楼上的看法我不是很同意,GIL锁对 IO 密集型操作没有影响,和多进程效率差不多。在 CPU 密集型操作下,多 ...

首先,谢谢大佬

我用的线程池,线程池不是默认阻塞的嘛?

halfone 发表于 2021-6-7 15:35

import os
import re
import concurrent.futures
import datetime
import json
import subprocess
import logging
import re
from multiprocessing import Process
###定义类###
class APK:
    def __init__(self, name, url, dir_path, file_path):
      self.__name = name
      self.__url = url
      self.__dir_path = dir_path
      self.__file_path = file_path
    def get_name(self):
      return self.__name
    def get_url(self):
      return self.__url
   
    def get_dir_path(self):
      return self.__dir_path
   
    def get_file_path(self):
      return self.__file_path
    ###下载模块###
def download_apk(apk):
    try:
      if 'make.file' in apk.get_url():
            cmd = ["wget", token, "-O Android.mk", apk.get_url()]
      else:
            cmd = ["wget", token, apk.get_url()]
      
      sp_output = None
      if os.path.exists(apk.get_dir_path()):
            os.chdir(apk.get_dir_path())
            for i in ["app-release.apk", "Android.mk", "app-debug.apk"]:
                if os.path.exists(i):
                  os.remove(i)
            logging.info("%s start download ", apk.get_file_path())
            logging.info("Download url is %s ",apk.get_url())
            sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
            logging.info("%s download successfully", apk.get_file_path())
      else:
            os.makedirs(apk.get_dir_path())
            os.chdir(apk.get_dir_path())
            logging.info("%s start download ", apk.get_file_path())
            logging.info("Download url is %s ",apk.get_url())
            sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
            logging.info("%s download successfully", apk.get_file_path())
    except subprocess.CalledProcessError as e:
      logging.error(e)
      print(sp_output)
      print(e.output)
      raise e
if __name__=='__main__':
    log_file = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    logging.basicConfig(level=logging.INFO, filename=log_file, filemode='a', format='%(asctime)s : %(message)s')
    url_prefix = 'http://ip:port/dir/apps'
    down_dir = '/home/xxx/xxx'
    token = "--user=xxx --password=xxx"
    ###处理字符串,返回下载列表###
    file_list = []
    with open('result.json','r') as f:
      for number,line in enumerate(f,start=1):
            if "V_Build" in line:
                continue
            
            if 'make.file' in line or 'apk' in line:
                line_formated = line.split(':')[-1].replace('"','').replace(',','').replace('\n','').replace(' ','')
                res = re.search(r"\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}", line_formated)
                if not res:
                  file_list.append(line_formated)
    print(file_list)
    print(len(file_list))
    ###获取类的集合###
      
    down_list = []
    for name in file_list:
      dir_path = '/'.join(name.split('/')[:-1])
      abs_dir_path = down_dir + dir_path
      abs_file_path = abs_dir_path + name.split('/')[-1]
      url = url_prefix + name
      apk = APK(name, url, abs_dir_path, abs_file_path)
      down_list.append(apk)
    ###线程/进程集合###   
    task = []
    # for apk in down_list:
    #   p = Process(target=download_apk,args=(apk,))
    #   p.start()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
      for apk in down_list:
            task.append(e.submit(download_apk, apk))
    ###读取返回值###   
    for i in task:
      if i:
            print(i)

代码格式化一下,大家看着可以方便点。

thepoy 发表于 2021-6-7 16:19

多些如果 发表于 2021-6-7 11:34
首先,谢谢大佬

我用的线程池,线程池不是默认阻塞的嘛?

submit 相当于丢给线程池一个在未来执行的任务。
如果你只丢任务,不管这些任务的执行状态,很容易出现部分线程未执行或未执行完成即退出主线程,结束程序。
你需要监听这些未来任务。
方法:
每个任务有一个 done 方法,意味着这个任务完成。
futures 包中也有一个 wait 方法,可以等待所有任务完成。
不过上面两个方法都是主线程主动查询子线程,这是没有必要的。
最好的办法是用 futures 包里的 as_completed 让子线程完成任务后主动通知主线程。

from concurrent.futures import as_completed

for task in as_completed(tasks):
    data = task.result()
    # 处理 data
页: [1]
查看完整版本: 关于python 多线程/多进程 下载多个文件