Python多处理池映射和imap

航空

我有一个可以正常工作multiprocessing脚本pool.map问题在于,并非所有进程都需要花费很长时间才能完成,因此某些进程因为要等到所有进程完成才入睡(与此问题相同)。有些文件在不到一秒钟的时间内完成,其他文件则需要几分钟(或几小时)。

如果我正确理解了本手册(和本帖子)pool.imap则不是在等待所有过程完成,如果已经完成,则是在提供一个新文件进行处理。当我尝试这样做时,脚本将加快要处理的文件的速度,小的文件将按预期方式处理,大的文件(需要更多时间来处理)直到结尾都没有完成(被杀死,恕不另行通知?)。这是正常现象pool.imap吗,还是我需要添加更多命令/参数?当我time.sleep(100)else测试中添加零件时,它正在处理更大的文件,但其他进程进入了睡眠状态。有什么建议么 ?谢谢

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    
拉贾

由于您已经将所有文件放入列表中,因此可以将它们直接放入队列中。然后,该队列将与您的子流程共享,这些子流程将从队列中获取文件名并执行其工作。无需重复两次(首先进入列表,然后由Pool.imap进行泡菜列表)。Pool.imap的功能完全相同,但您不知道。

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

可以替换为:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

完整的解决方案如下所示:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章