import multiprocessing.queues as queues
import multiprocessing
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
def __iter__(self):
return self
def put(self, obj, block=True, timeout=None):
super(I, self).put(obj,block,timeout)
self.length += 1
def get(self, block = True, timeout = None):
self.length -= 1
return super(I, self).get(block, timeout)
def __len__(self):
return self.length
def next(self):
item = self.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put('Done')
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
我正在尝试创建一个可迭代的队列以与多处理池映射一起使用。想法是该函数thisworker
将某些项目追加到队列中,直到满足条件为止,然后在将“ Done”放入队列后退出(我在此代码中尚未完成此操作)
但是,此代码永远不会完成,总是会挂断。
我无法调试真正的原因。请求您的帮助
PS:我之所以使用过,self.length
是因为map_async
从under调用的方法the_pool.map
需要使用可迭代对象的长度来形成一个变量:chunksize
,该变量将用于从池中获取任务。
问题是您在'Done'
中将其视为特殊情况Queue
,这表明迭代应停止。因此,如果您在Queue
示例中使用for循环进行迭代,则返回的全部是1
。但是,您声称的长度Queue
为2。这使map
代码搞砸了,该代码依赖于该长度来准确表示可迭代项中的项目数,以便知道何时从工作人员返回了所有结果。 :
class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
...
# _number_left is used to know when the MapResult is done
self._number_left = length//chunksize + bool(length % chunksize)
因此,您需要使长度实际准确。您可以通过以下几种方法来做到这一点,但我建议您完全不需要将前哨载入其中Queue
,而应使用get_nowait
:
import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
... <snip>
def next(self):
try:
item = self.get_nowait()
except Empty:
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
另外,请注意,这种方法并不安全。length
仅当您仅从单个进程put
进入Queue
,然后put
再将其发送Queue
到辅助进程后再进入该属性,该属性才正确。如果不调整导入和实现,它也将无法在Python 3中工作,因为的构造函数multiprocessing.queues.Queue
已更改。
multiprocessing.queues.Queue
我建议不要使用子类iter
来遍历Queue
:
q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None) # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned
这将适用于所有版本的Python,代码少得多,并且是过程安全的。
编辑:
根据您在对我的答案的评论中提到的要求,我认为最好使用imap
代替map
,这样您根本不需要知道的长度Queue
。现实情况是,您无法准确确定该长度,实际上,长度可能最终会随着您的迭代而增长。如果您imap
专门使用,则可以执行与原始方法类似的操作:
import multiprocessing
class I(object):
def __init__(self, maxsize=0):
self.q = multiprocessing.Queue(maxsize)
def __getattr__(self, attr):
if hasattr(self.q, attr):
return getattr(self.q, attr)
def __iter__(self):
return self
def next(self):
item = self.q.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
if item == 1:
q.put(3)
if item == 2:
q.put('Done')
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put(2)
q.put(5)
the_pool = multiprocessing.Pool(2) # 2 workers
print list(the_pool.imap(thisworker, q))
输出:
got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]
我摆脱了担心长度的代码,并使用委托而不是继承来获得更好的Python 3.x兼容性。
请注意,iter(q.get, <sentinel>)
只要您使用imap
而不是,我使用的原始建议在这里仍然适用map
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句