如何从父(消费者)进程关闭多处理队列

用户48956

在使用多处理队列在进程之间进行通信的情况下,许多文章建议向队列发送终止消息。但是,如果子进程是生产者,则可能会失败,从而使消费者没有收到通知以期待更多消息。

但是,如果子进程死亡,则可以通知父进程。它似乎应该可以通知此过程中的工作线程退出并且不期望更多消息。但是怎么做?

    multiprocessing.Queue.close()

...不通知消费者(真的吗?等等?什么!)

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")

...不让我等待待处理的工作完成。

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")
        # messageQ.close()
        messageQ.join_thread()  # Wait for worker to complete

...失败,因为队列尚未关闭。

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")
        messageQ.close()
        messageQ.join_thread()  # Wait for worker to complete

...似乎它应该可以工作,但在工作人员中失败并出现 TypeError 异常:

    msg = messageQ.get()
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
    return self._recv(size)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
            while !quit:
                try:
                   msg = messageQ.get(block=True, timeout=0.5)
                except Empty:
                   continue

... 很糟糕,因为它不必要地要求交易关闭延迟而不限制 CPU。

完整示例

import multiprocessing
import threading


def producer(messageQ):
    messageQ.put("1")
    messageQ.put("2")
    messageQ.put("3")

if __name__ == '__main__':
    messageQ = multiprocessing.Queue()
    def worker():
        try:
            while True:
                msg = messageQ.get()
                print(msg)
                if msg=="TERMINATE": return
                # messageQ.task_done()
        finally:
            print("Worker quit")
            # messageQ.close()  # End thread
            # messageQ.join_thread()

    thr = threading.Thread(target=worker, 
                           daemon=False)   # The work queue is precious.
    thr.start()

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE") # Notify worker we are done
        messageQ.close()        # No more messages
        messageQ.join_thread()  # Wait for worker to complete

    def runProcess():
        proc = multiprocessing.Process(target=producer, args=(messageQ,))
        proc.start()
        proc.join()
        print("runProcess quitting ...")
        onProcessQuit()
        print("runProcess quitting .. OK")

    runProcess()

如果您担心该producer过程无法正常完成,那么我不确定您的问题是什么,因为您的代码应该可以正常工作,除了一些更正:(1)它缺少一个import声明,(2)没有调用runProcess(3) 您的工作线程错误地是一个守护线程(因此它可能在有机会处理队列中的所有消息之前就终止了)。

我还将用作个人偏好(而不是更正)None作为特殊的哨兵消息,而不是TERMINATE删除一些您并不真正需要的无关队列调用(我没有看到您明确关闭队列完成任何必要的事情)。

这些是变化:

def producer(messageQ):
    messageQ.put("1")
    messageQ.put("2")
    messageQ.put("3")

if __name__ == '__main__':
    import multiprocessing
    import threading

    SENTINEL = None

    def worker():
        try:
            while True:
                msg = messageQ.get()
                if msg is SENTINEL:
                    return # No need to print the sentinel
                print(msg)
        finally:
            print("Worker quit")


    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put(SENTINEL) # Notify worker we are done

    def runProcess():
        proc = multiprocessing.Process(target=producer, args=(messageQ,))
        proc.start()
        proc.join()
        print("runProcess quitting ...")
        onProcessQuit()
        print("runProcess quitting .. OK")
        thr.join()

    messageQ = multiprocessing.Queue()
    thr = threading.Thread(target=worker)   # The work queue is precious.
    thr.start()
    runProcess()

印刷:

1
2
3
runProcess quitting ...
runProcess quitting .. OK
Worker quit

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章