我编写了一个脚本来启动多个进程(简单的单元测试)以并行运行。它将一次执行N
具有num_workers
并行进程的作业。
我的第一个实现批量运行了多个进程,num_workers
并且看起来运行良好(我在false
这里使用命令来测试行为)
import subprocess
errors = 0
num_workers = 10
N = 100
i = 0
while i < N:
processes = []
for j in range(i, min(i+num_workers, N)):
p = subprocess.Popen(['false'])
processes.append(p)
[p.wait() for p in processes]
exit_codes = [p.returncode for p in processes]
errors += sum(int(e != 0) for e in exit_codes)
i += num_workers
print(f"There were {errors}/{N} errors")
但是,测试所花费的时间并不相同,因此有时我会等待缓慢的测试完成。因此,我重写了它,以便在完成任务时继续分配任务
import subprocess
import os
errors = 0
num_workers = 40
N = 100
assigned = 0
completed = 0
processes = set()
while completed < N:
if assigned < N:
p = subprocess.Popen(['false'])
processes.add((assigned, p))
assigned += 1
if len(processes) >= num_workers or assigned == N:
os.wait()
for i, p in frozenset(processes):
if p.poll() is not None:
completed += 1
processes.remove((i, p))
err = p.returncode
print(i, err)
if err != 0:
errors += 1
print(f"There were {errors}/{N} errors")
但是,这对于最后几个过程会产生错误的结果。例如,在上面的示例中,它产生98/100错误,而不是100错误。我检查了一下,这与并发无关;它与并发无关。由于某种原因,最近的两个作业以退出代码0返回。
为什么会这样?
问题出在哪里os.wait()
。正如文档所述,它不仅等待子进程退出,而且还返回该子进程的pid和“退出状态指示” 。这需要等待子进程终止。但子级终止后,其返回码将不再适用于poll
。这是重现该问题的简单测试:
import os
import subprocess
p = subprocess.Popen(['false'], stderr=subprocess.DEVNULL)
pid, retcode = os.wait()
print("From os.wait: {}".format(retcode))
print("From popen object before poll: {}".format(p.returncode))
p.poll()
print("From popen object after poll: {}".format(p.returncode))
njv@organon:~/tmp$ python false_runner.py
From os.wait: 256
From Popen object before poll: None
From Popen object after poll: 0
的_internal_poll
,由调用的源代码Popen.poll
清楚地说明了这里发生的情况:当Popen
尝试调用_waitpid
其子进程的pid时,它获取ChildProcessError: [Errno 10] No child processes
,并将其自身分配returncode
为0,因为此时无法确定子进程的返回码。
此例仅在您的示例中的最后几个子流程中发生的原因是因为os.wait
仅针对这种or assigned == N
情况被调用,而仅被调用一次或两次,因为您的子流程是如此之快。如果放慢速度,您将获得更多随机行为。
至于解决方法:我可能只是换个os.wait()
睡眠。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句