我的结构(大大简化)如下所示:
import multiprocessing
def creator():
# creates files
return
def relocator():
# moves created files
return
create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()
我所试图做的是有一堆被创建的文件creator
,并尽快他们获得创造有他们移动到另一个目录relocator
。
我想在multiprocessing
这里使用的原因是:
creator
等待搬迁先完成,因为搬迁需要时间,我不想浪费。我希望creator
和relocator
进程都是串行的(每次一次一个文件),但要并行运行。动作的“日志”应如下所示:
# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file
根据我已阅读的内容,Queue
这里的方法是。
策略:(也许不是最好的一个?!)
创建文件后,它将进入队列,完成重定位后,将从队列中将其删除。
但是我在编码时遇到了问题;同时创建多个文件(creator
并行运行的多个实例)和其他文件...
对于任何想法,提示,解释等,我将不胜感激。
让我们提出您的想法并分解为以下功能:
创建者应创建文件(例如100个)
定位器应一次移动1个文件,直到没有更多文件可移动
创建者可能会在Relocator之前结束,因此它也可以将自己转变为Relocator,两者都必须知道何时完成
因此,我们有2个主要功能:
def create(i):
# creates files and return outpath
return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))
def relocate(from, to):
# moves created files
shuttil.move(from, to)
现在让我们创建流程:
from multiprocessing import Process, Queue
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()
这样,我们现在有一个创造者和重新定位,但是,现在,让我们说,我们要的造物主开始搬迁时,创造就业机会是由它来完成,我们可以只使用重新定位,但我们需要推动一个更"STOP_FLAG"
因为我们会有2个进程重定位
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
for _ in range(2):
comm_q.put("STOP_FLAG")
relocator(comm_q)
假设我们现在想要任意数量的重定位器进程,我们应该稍微调整一下代码来处理此问题,我们需要该creator
方法知道有多少个标志来通知其他进程何时停止,我们产生的代码看起来像这个:
from multiprocessing import Process, Queue, cpu_count
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
for i in range(100):
comm_q.put(create(i))
for _ in range(number_of_subprocesses + 1): # we need to count ourselves
comm_q.put("STOP_FLAG")
relocator(comm_q)
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
rp.start()
然后,您将需要等待它们完成:
creator_process.join()
for rp in relocators:
rp.join()
您可能需要查看multiprocessing.Queue
文档
专门针对该get
方法(默认情况下为阻塞调用)
从队列中删除并返回一个项目。如果可选的args块为True(默认值)且超时为None(默认值),则必要时进行阻塞,直到有可用项为止。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句