初学分布式多进程 运行程序报错
本帖最后由 paypojie 于 2022-9-29 18:55 编辑实现分布式多进程的代码报错
服务进程# 分布式多进程之实现服务进程
# 第一步导入相关模块
import random, time, queue # 导入随机 时间 队列 模块
from multiprocessing.managers import BaseManager # 从多进程模块中的managers子模块中导入BaseManager 作用: 创建多进程管理器
from multiprocessing import freeze_support
# 创建两个相关的队列对象 分别是任务队列和结果队列 使用构造方法创建queue对象
task_queue = queue.Queue()
result_queue = queue.Queue()
# 定义两个显式函数 函数返回值是指向queue对象的变量
def g_task_queue():
global task_queue # 我也不知道这里用全局变量是什么意思 好像不用这行代码也不影响
return task_queue # 返回变量task_queue
# 和上面函数的作用差不多
def g_result_queue():
global result_queue # 我也不知道这里用全局变量是什么意思 好像不用这行代码也不影响
return result_queue # 返回变量result_queue
class QueueManager(BaseManager): # 定义一个QueueManager 继承自BaseManager
pass
if __name__ == '__main__': # 加这行代码 要不然会报错
freeze_support()
# 把之前两个队列注册到网络上 callable参数关联queue对象 不能用匿名函数 要不然无法正常运行
QueueManager.register('get_task_queue', callable=g_task_queue)
QueueManager.register('get_result_queue', callable=g_result_queue)
# 绑定ip,端口,验证码
manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')
# 启动manager
manager.start()
# 通过网络访问queue对象
task = manager.get_task_queue()
result1 = manager.get_result_queue()
# 将任务写入到task队列
for i in range(10):
n = random.randint(0,10000)
print('put task %d' % n)
time.sleep(1)
task.put(n)
# 关闭queue
manager.shutdown()
print('结束')
# print('结束') # 无用的代码 但是不明白为什么 如果执行会在编辑器终端开头处多输出一行结束语句
任务进程
import time, sys, queue
from multiprocessing.managers import BaseManager
# 定义一个和之前类似的BaseManager
class QueueManager(BaseManager):
pass
# 注册QueueManager 由于这个QueueManager只是从网络上获取queue 所以直接注册 提供名字就可以
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
print('连接到 %s' % server_addr)
# 绑定ip和验证码
m = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')
# 连接至网络
m.connect()
# 通过网络访问queue对象
task = m.get_task_queue()
result = m.get_result_queue()
# 读取task 并将读取的值放到result队列
for i in range(10):
try:
n = task.get(timeout=1)
print('%d * %d' % (n,n))
r = '%d * %d = %d' % (n,n,n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('queue is empty')
print('工作任务结束')
报错截图:
cmd下执行一个python文件
报错原因就是这些 不明白为什么 求求大神指出解决方案
链接 https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600#0 教程评论不是有回复,master.py改成这个就行了
#master.py
import random,time,queue #导入随机,时间,队列模块
from multiprocessing.managers import BaseManager #导入多进程管理
from multiprocessing import freeze_support #网上看到的不知道干什么用,只知道是win10防出错
task_queue = queue.Queue()#实例化队列为任务队列
result_queue = queue.Queue()#实例化队列为结果队列
def return_task_queue():#win下queuemanager注册到网络关联队列不能用lambda,所以自定义一个函数用于关联
global task_queue
return task_queue
def return_result_queue():#win下queuemanager注册到网络关联队列不能用lambda,所以自定义一个函数用于关联
global result_queue
return result_queue
class QueueManager(BaseManager): #定义QueueManager继承BaseManager
pass
def test():
QueueManager.register('get_task_queue',callable=return_task_queue) #注册任务队列到网络
QueueManager.register('get_result_queue',callable=return_result_queue) #注册结果队列到网络
manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc') #创建QueueManager实例,绑定ip、验证码
manager.start() #打开服务器
task = manager.get_task_queue() #通过QueueManager获取任务队列,避免绕过封装
result = manager.get_result_queue() #同上
for i in range(10):
n = random.randint(0,10000)#生成任务数字(随机数)
print('Put task %d...' % n)
task.put(n) #将任务数添加到任务队列
print('Try get result...')
try:
for i in range(10):
r = result.get(timeout=10)#监听结果队列,获取结果数
print('Result:%s' % r)
except queue.Empty:
print('queue is empty') #调试空队列错误
finally:
manager.shutdown() #不管前面代码是否有误,只要manager.start(),必须关闭,不然再次打开会报错
print('master exit.')
if __name__ == '__main__':
freeze_support()
print('Start!')
test() 本帖最后由 paypojie 于 2022-9-28 23:12 编辑
小黑屋 发表于 2022-9-28 20:43
教程评论不是有回复,master.py改成这个就行了
#master.py
谢谢这么用心但还是会报错
{:301_972:}
我感觉是我电脑的问题 我复制粘贴评论区的代码 然后运行 但还是报一样的错误{:301_972:} paypojie 发表于 2022-9-28 23:20
我感觉是我电脑的问题 我复制粘贴评论区的代码 然后运行 但还是报一样的错误
我刚才用你和评论回复的代码,提示一样:ConnectionRefusedError: 由于目标计算机积极拒绝,无法连接。 lengkeyu 发表于 2022-9-29 09:12
我刚才用你和评论回复的代码,提示一样:ConnectionRefusedError: 由于目标计算机积极 ...
出错的地方在:
print('Run task %d*%d'%(n,n))
#print('n * n' % (n,n))这种写法,运行时候会提示参数初始化的错误。 lengkeyu 发表于 2022-9-29 09:41
出错的地方在:
print('Run task %d*%d'%(n,n))
#print('n * n' % (n,n))这 ...
还是难以解决这个分布式进程跳过算了
页:
[1]