本帖最后由 LoveCode 于 2023-9-5 12:13 编辑
我试着使用进程锁也无法解决问题,那么很可能是底层出了问题。
然后去查看了 Pandas 的 to_hdf 文档,知道了它基于 PyTables 库。
最终在 PyTables 的 FAQ 中找到了答案:在多线程、多进程的情况下,读取内容是没有问题的,但写入内容会出问题!
链接在这里:https://www.pytables.org/FAQ.html ,里面提到了解决办法。
* * * 后续补充 * * *
回想起来意识到了我的失误:
Pandas 使用了 PyTables ,现在 PyTables 不支持多进程写内容,自然影响到了 Pandas 的 to_hdf 方法。
- 即便解决了
PyTables 的多进程问题,最后还是要回到 Pandas 上,它内部使用了 PyTables ,总不能去修改 Pandas 的源码。
于是我仔细看了上述 PyTables 文档提到的四种解决方案,简单概括就是实现多进程的通信。根据文章的提示,
它有一个测试文件 example/multiprocess_access_benchmarks.py ,这里上 Github 项目,查看对应目录中的测试文件,里面展示了上述四种方案。
那么现在对 Pandas 的使用来说,核心问题在于:不能有多个进程来写入 HDF 文件,那么我能想到的是如下方案:
- 单独启动一个
write_to_hdf 进程,它从管道、共享队列等中读取内容并且写入 HDF 文件,并且只有它能写入内容。
- 其它进程则将数据通过管道、共享队列等发送给
write_to_hdf 进程,这样就避免了多进程写 HDF 文件的问题。
现在修改 write.py 如下,因我使用的是共享队列。
import multiprocessing as mp
import pandas as pd
import numpy as np
def write_to_hdf(hdf_filename: str, q: mp.Queue):
while True:
df = q.get()
# 取出队列中的 DataFrame,然后再写入文件
df.to_hdf(hdf_filename, key="abc", append=True, format="table")
print("测试: 成功写入")
def w_in(q: mp.Queue):
print("w_in 启动")
df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list("ABCD"))
# 这里直接将 df 传入队列
q.put(df)
if __name__ == "__main__":
# 队列中的数据类型是 pd.DataFrame
data_queue = mp.Queue(maxsize=10)
mp.Process(target=write_to_hdf, args=("data.h5", data_queue)).start()
p1 = mp.Process(target=w_in, name="p1", args=(data_queue,))
p2 = mp.Process(target=w_in, name="p2", args=(data_queue,))
p3 = mp.Process(target=w_in, name="p3", args=(data_queue,))
mp_list = [p1, p2, p3]
for t in mp_list:
t.start()
for t in mp_list:
t.join()
while True:
if not p1.is_alive():
p1 = mp.Process(target=w_in, name="p1", args=(data_queue,))
p1.start()
if not p2.is_alive():
p2 = mp.Process(target=w_in, name="p2", args=(data_queue,))
p2.start()
if not p3.is_alive():
p3 = mp.Process(target=w_in, name="p3", args=(data_queue,))
p3.start()
for t in mp_list:
t.join()
测试写入 HDF 文件的代码结果如下:
顺便测试了一下读取 HDF 文件的代码也是没有问题的。
|