我正在编写一个程序,其中可变数量的Agent对象同时运行多个串行方法,并将它们的返回值存储在queue属性中。每个代理都具有单个Worker(Process的子类)作为属性,并为其提供作业以通过cmd_queue串行运行。代理在res_queue中从其Worker获取结果。这些是当前的Manager()。Queue()实例,其原因是:TypeError: Pickling an AuthenticationString object is disallowed for security reasons
但是,如果我使用常规的Queue.Queue,则工作线程会获得代理的cmd_queue的副本,并且看不到代理向其添加的内容(始终为空)。
我可以使用此问题中引用的解决方案来腌制实例方法:使用python的multiprocessing Pool.map()时不能腌制<type'instancemethod'>
from multiprocessing import Manager, Process
from time import sleep
import copy_reg
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method
class Worker(Process):
def __init__(self, cmd_queue, res_queue):
self.cmd_queue = cmd_queue
self.res_queue = res_queue
Process.__init__(self)
def run(self):
while True:
f, args, kwargs = self.cmd_queue.get()
self.res_queue.put( f(*args, **kwargs) )
class Agent:
def __init__(self):
self.cmd_queue = Manager().Queue()
self.res_queue = Manager().Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()
def produce(self, f, *args, **kwargs):
self.cmd_queue.put((f, args, kwargs))
def do_some_work(self):
self.produce(self.foo, waka='waka')
def do_some_other_work(self):
self.produce(self.bar, humana='humana')
def foo(self, **kwargs):
sleep(5)
return('this is a foo')
def bar(self, **kwargs):
sleep(10)
return('this is a bar')
def get_results(self): #blocking call
res = []
while not self.cmd_queue.empty():#wait for Worker to finish
sleep(.5)
while not self.res_queue.empty():
res.append(self.res_queue.get())
return res
#This is the interface I'm looking for.
if __name__=='__main__':
agents = [Agent() for i in range(50)]
#this should flow quickly as the calls are added to cmd_queues
for agent in agents:
agent.do_some_work()
agent.do_some_other_work()
for agent in agents:
print(agent.get_results())
我的问题是,如何使用多处理程序来使此代码正常工作?或者是否有更好的方法被更广泛接受?这只是较大框架的一小部分,因此我希望它尽可能地面向对象。
编辑:这是在python 2.7中。
您可以使用普通的进行此操作multiprocessing.Queue
。您只需要调整Agent
类,以免Queue
在对Agent
类本身进行酸洗时不对实例进行酸洗。这是必需的,因为Agent
当您腌制要发送给的实例方法时,必须腌制实例本身Worker
。不过,这样做很容易:
class Agent(object): # Agent is now a new-style class
def __init__(self):
self.cmd_queue = Queue()
self.res_queue = Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()
def __getstate__(self):
""" This is called to pickle the instance """
self_dict = self.__dict__.copy()
del self_dict['cmd_queue']
del self_dict['res_queue']
del self_dict['worker']
return self_dict
def __setstate__(self, self_dict):
""" This is called to unpickle the instance. """
self.__dict__ = self_dict
... # The rest is the same.
请注意,此代码中还有一些其他逻辑问题使它无法正常运行。get_results
并没有真正按照您的预期去做,因为这容易受到比赛条件的影响:
while not self.cmd_queue.empty():#wait for Worker to finish
sleep(.5)
while not self.res_queue.empty():
res.append(self.res_queue.get())
cmd_queue
在您实际传递给它的函数在内部完成运行之前,可能(并且确实使用示例代码)最终为空Worker
,这意味着当您将所有内容从中拉出时,某些结果将丢失res_queue
。您可以使用来解决此问题JoinableQueue
,它可以使工作人员在完成工作时实际发出信号。
您还应该将哨兵发送给工作进程,以使其正常关闭,并使所有结果都正确地从res_queue
父进程清除并发送回父进程。我还发现我需要在上添加一个哨兵res_queue
,否则有时res_queue
在从孩子那里写入到它的最后结果实际上在管道中被冲洗之前,在父级中会显示为空,这意味着最后的结果会丢失。
这是一个完整的工作示例:
from multiprocessing import Process, Queue, JoinableQueue
import types
from time import sleep
import copy_reg
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Worker(Process):
def __init__(self, cmd_queue, res_queue):
self.cmd_queue = cmd_queue
self.res_queue = res_queue
Process.__init__(self)
def run(self):
for f, args, kwargs in iter(self.cmd_queue.get,
(None, (), {})): # None is our sentinel
self.res_queue.put( f(*args, **kwargs) )
self.cmd_queue.task_done() # Mark the task as done.
self.res_queue.put(None) # Send this to indicate no more results are coming
self.cmd_queue.task_done() # Mark the task as done
class Agent(object):
def __init__(self):
self.cmd_queue = JoinableQueue()
self.res_queue = Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['cmd_queue']
del self_dict['res_queue']
del self_dict['worker']
return self_dict
def __setstate__(self, self_dict):
self.__dict__ = self_dict
def produce(self, f, *args, **kwargs):
self.cmd_queue.put((f, args, kwargs))
def do_some_work(self):
self.produce(self.foo, waka='waka')
def do_some_other_work(self):
self.produce(self.bar, humana='humana')
def send_sentinel(self):
self.produce(None)
def foo(self, **kwargs):
sleep(2)
return('this is a foo')
def bar(self, **kwargs):
sleep(4)
return('this is a bar')
def get_results(self): #blocking call
res = []
self.cmd_queue.join() # This will block until task_done has been called for every put pushed into the queue.
for out in iter(self.res_queue.get, None): # None is our sentinel
res.append(out)
return res
#This is the interface I'm looking for.
if __name__=='__main__':
agents = [Agent() for i in range(50)]
#this should flow quickly as the calls are added to cmd_queues
for agent in agents:
agent.do_some_work()
agent.do_some_other_work()
agent.send_sentinel()
for agent in agents:
print(agent.get_results())
输出:
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句