关于python 多线程/多进程 下载多个文件
import osimport re
import concurrent.futures
import datetime
import json
import subprocess
import logging
import re
from multiprocessing import Process
###定义类###
class APK:
def __init__(self, name, url, dir_path, file_path):
self.__name = name
self.__url = url
self.__dir_path = dir_path
self.__file_path = file_path
def get_name(self):
return self.__name
def get_url(self):
return self.__url
def get_dir_path(self):
return self.__dir_path
def get_file_path(self):
return self.__file_path
###下载模块###
def download_apk(apk):
try:
if 'make.file' in apk.get_url():
cmd = ["wget", token, "-O Android.mk", apk.get_url()]
else:
cmd = ["wget", token, apk.get_url()]
sp_output = None
if os.path.exists(apk.get_dir_path()):
os.chdir(apk.get_dir_path())
for i in ["app-release.apk", "Android.mk", "app-debug.apk"]:
if os.path.exists(i):
os.remove(i)
logging.info("%s start download ", apk.get_file_path())
logging.info("Download url is %s ",apk.get_url())
sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
logging.info("%s download successfully", apk.get_file_path())
else:
os.makedirs(apk.get_dir_path())
os.chdir(apk.get_dir_path())
logging.info("%s start download ", apk.get_file_path())
logging.info("Download url is %s ",apk.get_url())
sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
logging.info("%s download successfully", apk.get_file_path())
except subprocess.CalledProcessError as e:
logging.error(e)
print(sp_output)
print(e.output)
raise e
if __name__=='__main__':
log_file = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
logging.basicConfig(level=logging.INFO, filename=log_file, filemode='a', format='%(asctime)s : %(message)s')
url_prefix = 'http://ip:port/dir/apps'
down_dir = '/home/xxx/xxx'
token = "--user=xxx --password=xxx"
###处理字符串,返回下载列表###
file_list = []
with open('result.json','r') as f:
for number,line in enumerate(f,start=1):
if "V_Build" in line:
continue
if 'make.file' in line or 'apk' in line:
line_formated = line.split(':')[-1].replace('"','').replace(',','').replace('\n','').replace(' ','')
res = re.search(r"\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}", line_formated)
if not res:
file_list.append(line_formated)
print(file_list)
print(len(file_list))
###获取类的集合###
down_list = []
for name in file_list:
dir_path = '/'.join(name.split('/')[:-1])
abs_dir_path = down_dir + dir_path
abs_file_path = abs_dir_path + name.split('/')[-1]
url = url_prefix + name
apk = APK(name, url, abs_dir_path, abs_file_path)
down_list.append(apk)
###线程/进程集合###
task = []
# for apk in down_list:
# p = Process(target=download_apk,args=(apk,))
# p.start()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
for apk in down_list:
task.append(e.submit(download_apk, apk))
###读取返回值###
for i in task:
if i:
print(i)
本人编程小白,请教各位大佬,以下几个问题:
1、在线程/进程集合 部分,在运行脚本的时候,发现用进程下载文件 耗时比用线程池少且下载完全;而用线程池的下载文件的时候,发现文件下载不全,例如要下载30个文件,实际上只下载了20,这是为啥?
2、本人了解到,shell命令curl是采用多线程下载文件,在此代码中如果用curl 会有什么不妥嘛?不管多进程还是多线程。
因为python有GIL锁 你换个python解释器比如jpython就行了 好像是有个全局锁的问题? 对楼上的看法我不是很同意,GIL锁对 IO 密集型操作没有影响,和多进程效率差不多。在 CPU 密集型操作下,多进程和多线程的效率差距才会明显。
爬虫或下载属于 IO 密集型操作,差距不会明显,除非你的代码有问题。
至于为什么多线程没有下载完 30 个,是因为你 task.append 添加了任务,但没有等待任务完成。想要完成所有线程,需要在主线程阻塞等待所有线程。 用python进程池from multiprocessing import Pool thepoy 发表于 2021-6-4 12:51
对楼上的看法我不是很同意,GIL锁对 IO 密集型操作没有影响,和多进程效率差不多。在 CPU 密集型操作下,多 ...
首先,谢谢大佬
我用的线程池,线程池不是默认阻塞的嘛? import os
import re
import concurrent.futures
import datetime
import json
import subprocess
import logging
import re
from multiprocessing import Process
###定义类###
class APK:
def __init__(self, name, url, dir_path, file_path):
self.__name = name
self.__url = url
self.__dir_path = dir_path
self.__file_path = file_path
def get_name(self):
return self.__name
def get_url(self):
return self.__url
def get_dir_path(self):
return self.__dir_path
def get_file_path(self):
return self.__file_path
###下载模块###
def download_apk(apk):
try:
if 'make.file' in apk.get_url():
cmd = ["wget", token, "-O Android.mk", apk.get_url()]
else:
cmd = ["wget", token, apk.get_url()]
sp_output = None
if os.path.exists(apk.get_dir_path()):
os.chdir(apk.get_dir_path())
for i in ["app-release.apk", "Android.mk", "app-debug.apk"]:
if os.path.exists(i):
os.remove(i)
logging.info("%s start download ", apk.get_file_path())
logging.info("Download url is %s ",apk.get_url())
sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
logging.info("%s download successfully", apk.get_file_path())
else:
os.makedirs(apk.get_dir_path())
os.chdir(apk.get_dir_path())
logging.info("%s start download ", apk.get_file_path())
logging.info("Download url is %s ",apk.get_url())
sp_output = subprocess.check_output(" ".join(cmd), shell=True, stderr=subprocess.PIPE)
logging.info("%s download successfully", apk.get_file_path())
except subprocess.CalledProcessError as e:
logging.error(e)
print(sp_output)
print(e.output)
raise e
if __name__=='__main__':
log_file = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
logging.basicConfig(level=logging.INFO, filename=log_file, filemode='a', format='%(asctime)s : %(message)s')
url_prefix = 'http://ip:port/dir/apps'
down_dir = '/home/xxx/xxx'
token = "--user=xxx --password=xxx"
###处理字符串,返回下载列表###
file_list = []
with open('result.json','r') as f:
for number,line in enumerate(f,start=1):
if "V_Build" in line:
continue
if 'make.file' in line or 'apk' in line:
line_formated = line.split(':')[-1].replace('"','').replace(',','').replace('\n','').replace(' ','')
res = re.search(r"\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}", line_formated)
if not res:
file_list.append(line_formated)
print(file_list)
print(len(file_list))
###获取类的集合###
down_list = []
for name in file_list:
dir_path = '/'.join(name.split('/')[:-1])
abs_dir_path = down_dir + dir_path
abs_file_path = abs_dir_path + name.split('/')[-1]
url = url_prefix + name
apk = APK(name, url, abs_dir_path, abs_file_path)
down_list.append(apk)
###线程/进程集合###
task = []
# for apk in down_list:
# p = Process(target=download_apk,args=(apk,))
# p.start()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
for apk in down_list:
task.append(e.submit(download_apk, apk))
###读取返回值###
for i in task:
if i:
print(i)
代码格式化一下,大家看着可以方便点。 多些如果 发表于 2021-6-7 11:34
首先,谢谢大佬
我用的线程池,线程池不是默认阻塞的嘛?
submit 相当于丢给线程池一个在未来执行的任务。
如果你只丢任务,不管这些任务的执行状态,很容易出现部分线程未执行或未执行完成即退出主线程,结束程序。
你需要监听这些未来任务。
方法:
每个任务有一个 done 方法,意味着这个任务完成。
futures 包中也有一个 wait 方法,可以等待所有任务完成。
不过上面两个方法都是主线程主动查询子线程,这是没有必要的。
最好的办法是用 futures 包里的 as_completed 让子线程完成任务后主动通知主线程。
from concurrent.futures import as_completed
for task in as_completed(tasks):
data = task.result()
# 处理 data
页:
[1]