在使用多处理队列在进程之间进行通信的情况下,许多文章建议向队列发送终止消息。但是,如果子进程是生产者,则可能会失败,从而使消费者没有收到通知以期待更多消息。
但是,如果子进程死亡,则可以通知父进程。它似乎应该可以通知此过程中的工作线程退出并且不期望更多消息。但是怎么做?
multiprocessing.Queue.close()
...不通知消费者(真的吗?等等?什么!)
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
...不让我等待待处理的工作完成。
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
# messageQ.close()
messageQ.join_thread() # Wait for worker to complete
...失败,因为队列尚未关闭。
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
messageQ.close()
messageQ.join_thread() # Wait for worker to complete
...似乎它应该可以工作,但在工作人员中失败并出现 TypeError 异常:
msg = messageQ.get()
File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
return self._recv(size)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
while !quit:
try:
msg = messageQ.get(block=True, timeout=0.5)
except Empty:
continue
... 很糟糕,因为它不必要地要求交易关闭延迟而不限制 CPU。
完整示例
import multiprocessing
import threading
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
messageQ = multiprocessing.Queue()
def worker():
try:
while True:
msg = messageQ.get()
print(msg)
if msg=="TERMINATE": return
# messageQ.task_done()
finally:
print("Worker quit")
# messageQ.close() # End thread
# messageQ.join_thread()
thr = threading.Thread(target=worker,
daemon=False) # The work queue is precious.
thr.start()
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE") # Notify worker we are done
messageQ.close() # No more messages
messageQ.join_thread() # Wait for worker to complete
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
runProcess()
如果您担心该producer
过程无法正常完成,那么我不确定您的问题是什么,因为您的代码应该可以正常工作,除了一些更正:(1)它缺少一个import
声明,(2)没有调用runProcess
(3) 您的工作线程错误地是一个守护线程(因此它可能在有机会处理队列中的所有消息之前就终止了)。
我还将用作个人偏好(而不是更正)None
作为特殊的哨兵消息,而不是TERMINATE
删除一些您并不真正需要的无关队列调用(我没有看到您明确关闭队列完成任何必要的事情)。
这些是变化:
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
import multiprocessing
import threading
SENTINEL = None
def worker():
try:
while True:
msg = messageQ.get()
if msg is SENTINEL:
return # No need to print the sentinel
print(msg)
finally:
print("Worker quit")
def onProcessQuit(): # Notify worker that we are done.
messageQ.put(SENTINEL) # Notify worker we are done
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
thr.join()
messageQ = multiprocessing.Queue()
thr = threading.Thread(target=worker) # The work queue is precious.
thr.start()
runProcess()
印刷:
1
2
3
runProcess quitting ...
runProcess quitting .. OK
Worker quit
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句