本帖最后由 cococola 于 2022-11-30 12:02 编辑
PS:更正错误,本帖是多进程方式,原帖笔误写成了多线程。
一、前言
本贴用来记录并分享下学习多进程知识的要点,用一个小case实践一下。
这个案例的需求是:
使用多进程复制一个文件夹中的所有文件,注意这个文件夹中不能包含文件夹,只能是文件。其实如果包含文件夹也可以用递归来实现,这个我就先不在这里赘述了。
二、多进程的关键知识
案例分析:
1.多进程是实现多任务的一种方式,需要导入multiprocessing工具包。
2.创建一个类,并在初始化方法中定义源文件及其拷贝后目标文件的路径
[Python] 纯文本查看 复制代码 def __init__(self):
self.source_path = input("请输入需要拷贝的目标文件夹:\t")
self.source_list = os.listdir(self.source_path)
self.par_path = input("请输入目标路径:\t")
self.dir_name = self.source_path.split("/")[-1]
self.dest_path = self.par_path + "/" + self.dir_name + "[复件]"
if not os.path.exists(self.dest_path):
os.mkdir(self.dest_path)
因考虑到脚本位置不固定,所以这里最好使用绝对路径的方式;在定义完源文件路径后,遍历这个文件夹下的所有文件,后续会使用。
使用split函数分隔源文件路径最后的一部分,也就是分割出来源文件名称,这样以来我们就可以定义目标文件的路径
比如源文件叫music,那么目标文件路径就可以定义为:[目标路径]/[music 复件]
使用os.path.exists判断是否存在这个文件夹,不存在就创建
这样我们前期的准备工作就完事了
3.构建一个拷贝单文件的功能函数
[Python] 纯文本查看 复制代码 def copy(self, old_name):
old_path = self.source_path + "/" + old_name
new_path = self.dest_path + "/" + old_name
with open(old_path, "rb") as fr:
with open(new_path, "wb") as fw:
while True:
content = fr.read(1024)
if content:
fw.write(content)
else:
break
将需要拷贝的单文件名称以参数方式传递进去,并拼接好源文件及目标文件的路径。
这里的路径和类的初始化定义的时候是不同的,比如:我想copy的单文件原路径是:D:\test\python\music\晴天.mp3
那目标文件的路径:C:\Users\Admin\Desktop\music[复件]\晴天.mp3
4.使用多进程的方式启动
[Python] 纯文本查看 复制代码 def start(self):
po = multiprocessing.Pool(3)
for each in self.source_list:
po.apply_async(self.copy, (each, q))
po.close()
po.join()
创建一个进程池multiprocessing.Pool(),循环遍历源文件夹下的所有文件,并将这些文件交给进程池中等待有空闲进程去处理
进程池的语法:multiprocessing.Pool().apply_async(func, (*args))
这里需要注意的是:虽然所有的任务交给进程池,但是并不能保证子进程都已经执行完毕所有任务,因此,需要在主进程中加入join()函数
让主进程等待所有子进程完成所有任务后主进程再退出,否则主进程会直接退出。
5.附加功能:因主进程阻塞,在等待子进程执行任务的空闲,可以让主进程做一些有意义的事情,不让其只等待。
比如可以让主进程与所有的子进程进行通信,当某一个子进程做完某一个任务后,告诉给主进程,让主进程统计整个任务的执行进度。
这里引入另外一个知识点:进程间通信队列multiprocessing.Manger().QUEUE(),在多线程启动之前定义一个通信队列
当某一个子进程执行完毕一个任务后,使用q.put()将执行完毕的信息存放到队列中,然后主线程读取这个队列,知道子进程的完成情况,如此以来就可以
实现下载进度的显示。
当然,这里也可以使用套接字来实现子线程与主线程之间的通信,因在这个案例中使用通信队列就能实现功能并且较套接字更便捷,这里套接字实现就暂不考虑。
三、功能实现截图
四、程序源码
[Python] 纯文本查看 复制代码 #!/usr/bin/env/ python
# -*- coding:utf-8 -*-
# [url=home.php?mod=space&uid=686208]@AuThor[/url] : zxy
import multiprocessing
import os
import time
class MultiCopy(object):
def __init__(self):
self.source_path = input("请输入需要拷贝的目标文件夹:\t")
self.source_list = os.listdir(self.source_path)
self.par_path = input("请输入目标路径:\t")
self.dir_name = self.source_path.split("/")[-1]
self.dest_path = self.par_path + "/" + self.dir_name + "[复件]"
if not os.path.exists(self.dest_path):
os.mkdir(self.dest_path)
def copy(self, old_name, q):
old_path = self.source_path + "/" + old_name
new_path = self.dest_path + "/" + old_name
with open(old_path, "rb") as fr:
with open(new_path, "wb") as fw:
while True:
content = fr.read(1024)
if content:
fw.write(content)
else:
break
# print(f"{old_name}文件复制完成")
q.put(f"{old_name}")
def start(self):
po = multiprocessing.Pool(3)
q = multiprocessing.Manager().Queue()
for each in self.source_list:
po.apply_async(self.copy, (each, q))
po.close()
# po.join()
count_ok = 0
all_count = len(self.source_list)
while True:
q.get()
count_ok += 1
time.sleep(0.5)
# print(f"\r文件拷贝进度:{count_ok / all_count * 100:.2f}%", end="")
comp = count_ok / all_count * 100
comp_int = int(comp * 0.5)
print(f"\r文件拷贝进度:[{'#' * comp_int:<50}]{comp:.2f}%", end="")
if count_ok >= all_count:
break
print("")
if __name__ == '__main__':
mc = MultiCopy()
mc.start()
|