甜萝 发表于 2022-9-28 19:20

初学分布式多进程 运行程序报错

本帖最后由 paypojie 于 2022-9-29 18:55 编辑

                                                             实现分布式多进程的代码报错


服务进程# 分布式多进程之实现服务进程

# 第一步导入相关模块
import random, time, queue      # 导入随机 时间 队列 模块
from multiprocessing.managers import BaseManager      # 从多进程模块中的managers子模块中导入BaseManager 作用: 创建多进程管理器
from multiprocessing import freeze_support
# 创建两个相关的队列对象 分别是任务队列和结果队列 使用构造方法创建queue对象
task_queue = queue.Queue()
result_queue = queue.Queue()

# 定义两个显式函数 函数返回值是指向queue对象的变量
def g_task_queue():
    global task_queue       # 我也不知道这里用全局变量是什么意思 好像不用这行代码也不影响
    return task_queue       # 返回变量task_queue

# 和上面函数的作用差不多
def g_result_queue():
    global result_queue         # 我也不知道这里用全局变量是什么意思 好像不用这行代码也不影响
    return result_queue         # 返回变量result_queue

class QueueManager(BaseManager):      # 定义一个QueueManager 继承自BaseManager
    pass

if __name__ == '__main__':      # 加这行代码 要不然会报错
    freeze_support()
    # 把之前两个队列注册到网络上 callable参数关联queue对象 不能用匿名函数 要不然无法正常运行
    QueueManager.register('get_task_queue', callable=g_task_queue)
    QueueManager.register('get_result_queue', callable=g_result_queue)

    # 绑定ip,端口,验证码
    manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')

    # 启动manager
    manager.start()

    # 通过网络访问queue对象
    task = manager.get_task_queue()
    result1 = manager.get_result_queue()

    # 将任务写入到task队列
    for i in range(10):
      n = random.randint(0,10000)
      print('put task %d' % n)
      time.sleep(1)
      task.put(n)

      # 关闭queue
    manager.shutdown()
    print('结束')
# print('结束') # 无用的代码 但是不明白为什么 如果执行会在编辑器终端开头处多输出一行结束语句
任务进程
import time, sys, queue
from multiprocessing.managers import BaseManager

# 定义一个和之前类似的BaseManager
class QueueManager(BaseManager):   
    pass

# 注册QueueManager 由于这个QueueManager只是从网络上获取queue 所以直接注册 提供名字就可以
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

server_addr = '127.0.0.1'
print('连接到 %s' % server_addr)

# 绑定ip和验证码
m = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')

# 连接至网络
m.connect()

# 通过网络访问queue对象
task = m.get_task_queue()
result = m.get_result_queue()

# 读取task 并将读取的值放到result队列
for i in range(10):
    try:
      n = task.get(timeout=1)
      print('%d * %d' % (n,n))
      r = '%d * %d = %d' % (n,n,n*n)
      time.sleep(1)
      result.put(r)
    except queue.Empty:
      print('queue is empty')

print('工作任务结束')
报错截图:



cmd下执行一个python文件



报错原因就是这些 不明白为什么 求求大神指出解决方案
链接 https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600#0

小黑屋 发表于 2022-9-28 20:43

教程评论不是有回复,master.py改成这个就行了
#master.py

import random,time,queue #导入随机,时间,队列模块
from multiprocessing.managers import BaseManager #导入多进程管理
from multiprocessing import freeze_support #网上看到的不知道干什么用,只知道是win10防出错

task_queue = queue.Queue()#实例化队列为任务队列
result_queue = queue.Queue()#实例化队列为结果队列

def return_task_queue():#win下queuemanager注册到网络关联队列不能用lambda,所以自定义一个函数用于关联
    global task_queue
    return task_queue

def return_result_queue():#win下queuemanager注册到网络关联队列不能用lambda,所以自定义一个函数用于关联
    global result_queue
    return result_queue

class QueueManager(BaseManager): #定义QueueManager继承BaseManager
    pass

def test():
    QueueManager.register('get_task_queue',callable=return_task_queue)   #注册任务队列到网络
    QueueManager.register('get_result_queue',callable=return_result_queue)   #注册结果队列到网络

    manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')   #创建QueueManager实例,绑定ip、验证码

    manager.start()   #打开服务器

    task = manager.get_task_queue()   #通过QueueManager获取任务队列,避免绕过封装
    result = manager.get_result_queue() #同上

    for i in range(10):
      n = random.randint(0,10000)#生成任务数字(随机数)
      print('Put task %d...' % n)
      task.put(n)                  #将任务数添加到任务队列

    print('Try get result...')

    try:
      for i in range(10):         
            r = result.get(timeout=10)#监听结果队列,获取结果数
            print('Result:%s' % r)
    except queue.Empty:
      print('queue is empty')         #调试空队列错误
    finally:
      manager.shutdown()            #不管前面代码是否有误,只要manager.start(),必须关闭,不然再次打开会报错
      print('master exit.')


if __name__ == '__main__':
    freeze_support()
    print('Start!')
    test()

甜萝 发表于 2022-9-28 23:05

本帖最后由 paypojie 于 2022-9-28 23:12 编辑

小黑屋 发表于 2022-9-28 20:43
教程评论不是有回复,master.py改成这个就行了
#master.py


谢谢这么用心但还是会报错

                                                                                                                           {:301_972:}

甜萝 发表于 2022-9-28 23:20

我感觉是我电脑的问题 我复制粘贴评论区的代码 然后运行 但还是报一样的错误{:301_972:}

lengkeyu 发表于 2022-9-29 09:12

paypojie 发表于 2022-9-28 23:20
我感觉是我电脑的问题 我复制粘贴评论区的代码 然后运行 但还是报一样的错误

我刚才用你和评论回复的代码,提示一样:ConnectionRefusedError: 由于目标计算机积极拒绝,无法连接。

lengkeyu 发表于 2022-9-29 09:41

lengkeyu 发表于 2022-9-29 09:12
我刚才用你和评论回复的代码,提示一样:ConnectionRefusedError: 由于目标计算机积极 ...

出错的地方在:
            print('Run task %d*%d'%(n,n))
            #print('n * n' % (n,n))这种写法,运行时候会提示参数初始化的错误。

甜萝 发表于 2022-9-29 12:42

lengkeyu 发表于 2022-9-29 09:41
出错的地方在:
            print('Run task %d*%d'%(n,n))
            #print('n * n' % (n,n))这 ...

还是难以解决这个分布式进程跳过算了
页: [1]
查看完整版本: 初学分布式多进程 运行程序报错