可迭代的多处理队列未退出

神人
import multiprocessing.queues as queues
import multiprocessing
class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    def __iter__(self):
        return self

    def put(self, obj, block=True, timeout=None):
        super(I, self).put(obj,block,timeout)
        self.length += 1

    def get(self, block = True, timeout = None):
        self.length -= 1
        return super(I, self).get(block, timeout)

    def __len__(self):
        return self.length

    def next(self):
        item = self.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put('Done')

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

我正在尝试创建一个可迭代的队列以与多处理池映射一起使用。想法是该函数thisworker将某些项目追加到队列中,直到满足条件为止,然后在将“ Done”放入队列后退出(我在此代码中尚未完成此操作)

但是,此代码永远不会完成,总是会挂断。

我无法调试真正的原因。请求您的帮助

PS:我之所以使用过,self.length是因为map_async从under调用方法the_pool.map需要使用可迭代对象的长度来形成一个变量:chunksize,该变量将用于从池中获取任务。

损害

问题是您在'Done'中将其视为特殊情况Queue,这表明迭代应停止。因此,如果您在Queue示例中使用for循环进行迭代,则返回的全部是1但是,您声称的长度Queue为2。这使map代码搞砸了,该代码依赖于该长度来准确表示可迭代项中的项目数,以便知道何时从工作人员返回了所有结果。 :

class MapResult(ApplyResult):

    def __init__(self, cache, chunksize, length, callback):
        ApplyResult.__init__(self, cache, callback)
        ...
        # _number_left is used to know when the MapResult is done
        self._number_left = length//chunksize + bool(length % chunksize)

因此,您需要使长度实际准确。您可以通过以下几种方法来做到这一点,但我建议您完全不需要将前哨载入其中Queue,而应使用get_nowait

import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty

class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    ... <snip>

    def next(self):
        try:
            item = self.get_nowait()
        except Empty:
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

另外,请注意,这种方法并不安全。length仅当您仅从单个进程put进入Queue,然后put再将其发送Queue到辅助进程后再进入属性,属性才正确如果不调整导入和实现,它也将无法在Python 3中工作,因为的构造函数multiprocessing.queues.Queue已更改。

multiprocessing.queues.Queue我建议不要使用子类iter来遍历Queue

q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None)  # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned

这将适用于所有版本的Python,代码少得多,并且是过程安全的。

编辑:

根据您在对我的答案的评论中提到的要求,我认为最好使用imap代替map,这样您根本不需要知道的长度Queue现实情况是,您无法准确确定该长度,实际上,长度可能最终会随着您的迭代而增长。如果您imap专门使用,则可以执行与原始方法类似的操作:

import multiprocessing

class I(object):
    def __init__(self, maxsize=0):
        self.q = multiprocessing.Queue(maxsize)

    def __getattr__(self, attr):
        if hasattr(self.q, attr):
            return getattr(self.q, attr)

    def __iter__(self):
        return self

    def next(self):
        item = self.q.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    if item == 1:
        q.put(3)
    if item == 2:
        q.put('Done')
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put(2)
q.put(5)

the_pool = multiprocessing.Pool(2)  # 2 workers
print list(the_pool.imap(thisworker, q))

输出:

got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]

我摆脱了担心长度的代码,并使用委托而不是继承来获得更好的Python 3.x兼容性。

请注意,iter(q.get, <sentinel>)只要您使用imap而不是,我使用的原始建议在这里仍然适用map

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章