`此代码尝试使用队列将任务提供给多个工作进程。
我想对不同数量的进程和不同的数据处理方法之间的速度差异进行计时。
但是输出并没有像我想象的那样做。
from multiprocessing import Process, Queue
import time
result = []
base = 2
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21]
# create queue for new tasks
new_tasks = Queue(maxsize=0)
# put tasks in queue
print('Putting tasks in Queue')
for i in data:
new_tasks.put(i)
# worker function definition
def f(q, p_num):
print('Starting process: {}'.format(p_num))
while not q.empty():
# mimic some process being done
time.sleep(0.05)
print(q.get(), p_num)
print('Finished', p_num)
print('initiating processes')
processes = []
for i in range(0, 2):
if __name__ == '__main__':
print('Creating process {}'.format(i))
p = Process(target=f, args=(new_tasks, i))
processes.append(p)
#record start time
start = time.time()
# start process
for p in processes:
p.start()
# wait for processes to finish processes
for p in processes:
p.join()
#record end time
end = time.time()
# print time result
print('Time taken: {}'.format(end-start))
我期待这个:
Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Starting process: 1
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 1
Finished 0
Time taken: <some-time>
但相反,我实际上得到了这个:
Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Time taken: 0.01000523567199707
Putting tasks in Queue
Putting tasks in Queue
initiating processes
Time taken: 0.0
Starting process: 1
initiating processes
Time taken: 0.0
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 0
似乎有两个主要问题,我不确定它们之间的关系:
打印语句例如:Putting tasks in Queue
initiating processes
Time taken: 0.0
在代码中系统地重复 - 我说系统地是因为它们每次都完全重复。
第二个进程永远不会完成,它永远不会识别队列为空,因此无法退出
1)我无法重现这一点。
2)看下面的代码:
while not q.empty():
time.sleep(0.05)
print(q.get(), p_num)
每行都可以由任何进程以任何顺序运行。现在考虑q
有一个项目和两个进程A
和B
。现在考虑以下执行顺序:
# A runs
while not q.empty():
time.sleep(0.05)
# B runs
while not q.empty():
time.sleep(0.05)
# A runs
print(q.get(), p_num) # Removes and prints the last element of q
# B runs
print(q.get(), p_num) # q is now empty so q.get() blocks forever
交换顺序time.sleep
并q.get
消除我所有运行中的阻塞,但仍然有可能有多个进程进入循环,只剩下一个项目。
解决此问题的方法是使用非阻塞get
调用并捕获queue.Empty
异常:
import queue
while True:
time.sleep(0.05)
try:
print(q.get(False), p_num)
except queue.Empty:
break
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句