本帖最后由 soenluzy 于 2022-1-30 00:21 编辑
本人纯python新手,由于有需要处理大量列表数据的问题,写了个简单的多线程处理工具类,有大佬觉得可以优化的地方,可以评论指正
(以下根据评论区意见做了修改)
代码如下:
[Python] 纯文本查看 复制代码 import multiprocessing as mp
class ForkJoinTool:
"""多任务并行处理列表数据工具
当列表过大时,可将列表拆分多个list,并调用传入函数(func)并行处理,并返回合并结果
Attributes:
func: 处理list的函数
itData: 需要处理的列表数据(list).
size: 指定每个任务处理的条数,可为空,默认100
"""
def __init__(self, func, itData, size):
self.func = func
self.optList = itData
self.size = 100
if (not size is None and size > 0):
self.size = size
if (len(itData) > self.size):
self.optList = self.list_of_groups(itData, self.size)
def split(self, size):
"""指定每个任务处理的条数,并根据size来拆分列表
:param size 指定每个任务处理的条数,可为空,默认100
"""
if( not size is None and size > 0):
self.size = size
if( len(self.itData) > self.size):
self.optList = self.list_of_groups(self.itData, self.size)
def run(self):
with mp.Pool(mp.cpu_count()) as pool:
self.resultList = pool.starmap(self.func, zip(self.optList))
return self
def join(self):
newList = []
for i, v in enumerate(self.resultList):
newList = newList + v
return newList
def list_of_groups(self, init_list, children_list_len):
"""将list拆分为指定长度的子list
:param init_list 指定list,如:[3, 6, 32, 65, 1, 4]
:param children_list_len 指定长度,如:3
:return 返回拆分后的结果list,如:[[3, 6, 32], [65, 1, 4]]
"""
list_of_groups = zip(*(iter(init_list),) * children_list_len)
end_list = [list(i) for i in list_of_groups]
count = len(init_list) % children_list_len
end_list.append(init_list[-count:]) if count != 0 else end_list
return end_list
# 以下为演示代码,可删除
def func(list):
alist = []
for i, e in enumerate(list):
if( e%2 == 0 ):
alist.append(e)
return alist
if __name__ == '__main__':
code_list = [3, 6, 32, 64, 1, 4, 10, 11, 14, 35, 48]
r = ForkJoinTool(func,code_list,3).run().join()
print(r)
# 结果为 [6, 32, 64, 4, 10, 14, 48]
|