多处理和队列

哈里·德温顿

`此代码尝试使用队列任务提供给多个工作进程

我想对不同数量的进程和不同的数据处理方法之间的速度差异进行计时。

但是输出并没有像我想象的那样做。

from multiprocessing import Process, Queue
import time
result = []

base = 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21]

# create queue for new tasks
new_tasks = Queue(maxsize=0)

# put tasks in queue
print('Putting tasks in Queue')
for i in data:
    new_tasks.put(i)

# worker function definition
def f(q, p_num):
    print('Starting process: {}'.format(p_num))
    while not q.empty():
        # mimic some process being done
        time.sleep(0.05)
        print(q.get(), p_num)
    print('Finished', p_num)

print('initiating processes')
processes = []
for i in range(0, 2):
    if __name__ == '__main__':
        print('Creating process {}'.format(i))
        p = Process(target=f, args=(new_tasks, i))
        processes.append(p)
#record start time
start = time.time()

# start process
for p in processes:
    p.start()

# wait for processes to finish processes
for p in processes:
    p.join()

#record end time
end = time.time()

# print time result
print('Time taken: {}'.format(end-start))

期待这个:

Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Starting process: 1
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 1
Finished 0
Time taken: <some-time>

但相反,我实际上得到了这个:

Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Time taken: 0.01000523567199707
Putting tasks in Queue
Putting tasks in Queue
initiating processes
Time taken: 0.0
Starting process: 1
initiating processes
Time taken: 0.0
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 0

似乎有两个主要问题,我不确定它们之间的关系:

  1. 打印语句例如:Putting tasks in Queue initiating processes Time taken: 0.0在代码中系统地重复 - 我说系统地是因为它们每次都完全重复。

  2. 第二个进程永远不会完成,它永远不会识别队列为空,因此无法退出

伊库

1)我无法重现这一点。

2)看下面的代码:

while not q.empty():
    time.sleep(0.05)
    print(q.get(), p_num)

每行都可以由任何进程以任何顺序运行。现在考虑q有一个项目和两个进程AB现在考虑以下执行顺序:

# A runs
while not q.empty():
    time.sleep(0.05)

# B runs
while not q.empty():
    time.sleep(0.05)

# A runs
print(q.get(), p_num)  # Removes and prints the last element of q

# B runs
print(q.get(), p_num)  # q is now empty so q.get() blocks forever

交换顺序time.sleepq.get消除我所有运行中的阻塞,但仍然有可能有多个进程进入循环,只剩下一个项目。

解决此问题的方法是使用非阻塞get调用并捕获queue.Empty异常:

import queue

while True:
    time.sleep(0.05)
    try:
        print(q.get(False), p_num)
    except queue.Empty:
        break

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章