多任务并行处理列表工具
本帖最后由 soenluzy 于 2022-1-30 00:21 编辑本人纯python新手,由于有需要处理大量列表数据的问题,写了个简单的多线程处理工具类,有大佬觉得可以优化的地方,可以评论指正
(以下根据评论区意见做了修改)
代码如下:
import multiprocessing as mp
class ForkJoinTool:
"""多任务并行处理列表数据工具
当列表过大时,可将列表拆分多个list,并调用传入函数(func)并行处理,并返回合并结果
Attributes:
func: 处理list的函数
itData: 需要处理的列表数据(list).
size: 指定每个任务处理的条数,可为空,默认100
"""
def __init__(self, func, itData,size):
self.func = func
self.optList = itData
self.size = 100
if (not size is None and size > 0):
self.size = size
if (len(itData) > self.size):
self.optList = self.list_of_groups(itData, self.size)
def split(self, size):
"""指定每个任务处理的条数,并根据size来拆分列表
:param size 指定每个任务处理的条数,可为空,默认100
"""
if( not size is None and size > 0):
self.size = size
if( len(self.itData) > self.size):
self.optList = self.list_of_groups(self.itData, self.size)
def run(self):
with mp.Pool(mp.cpu_count()) as pool:
self.resultList = pool.starmap(self.func, zip(self.optList))
return self
def join(self):
newList = []
for i, v in enumerate(self.resultList):
newList = newList + v
return newList
def list_of_groups(self, init_list, children_list_len):
"""将list拆分为指定长度的子list
:param init_list 指定list,如:
:param children_list_len 指定长度,如:3
:return 返回拆分后的结果list,如:[, ]
"""
list_of_groups = zip(*(iter(init_list),) * children_list_len)
end_list =
count = len(init_list) % children_list_len
end_list.append(init_list[-count:]) if count != 0 else end_list
return end_list
# 以下为演示代码,可删除
def func(list):
alist = []
for i, e in enumerate(list):
if( e%2 == 0 ):
alist.append(e)
return alist
if __name__ == '__main__':
code_list =
r = ForkJoinTool(func,code_list,3).run().join()
print(r)
# 结果为
没必要上多进程 IO密集型 普通多线程就行 一直想学却一直学不起来 我个人的建议去掉split接口,size由构造函数接受,同时在构造中分组。
启动的话要么在构造中启动,要么再暴露一个start接口,在join中启动始终觉得怪怪的...
同时同意楼上的多线程建议,进程开销远比线程大... 幽溪左畔 发表于 2022-1-28 18:14
没必要上多进程 IO密集型 普通多线程就行
好的,我看看 unmask 发表于 2022-1-28 19:48
我个人的建议去掉split接口,size由构造函数接受,同时在构造中分组。
启动的话要么在构造中启动,要么再 ...
可以,我改一下
页:
[1]