在多处理队列中挣扎

马0

我的结构(大大简化)如下所示:

import multiprocessing

def creator():
    # creates files
    return


def relocator():
    # moves created files
    return


create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()

我所试图做的是有一堆被创建的文件creator,并尽快他们获得创造有他们移动到另一个目录relocator

我想在multiprocessing这里使用的原因是:

  • 我不想creator等待搬迁先完成,因为搬迁需要时间,我不想浪费。
  • 也不能选择先创建所有文件,然后再开始复制,因为驱动器中没有足够的空间来容纳所有文件。

我希望creatorrelocator进程都是串行的(每次一次一个文件),但要并行运行。动作的“日志”应如下所示:

# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file

根据我已阅读的内容,Queue这里的方法是。

策略:(也许不是最好的一个?!)

创建文件后,它将进入队列,完成重定位后,将从队列中将其删除。

但是我在编码时遇到了问题;同时创建多个文件(creator并行运行的多个实例)和其他文件...

对于任何想法,提示,解释等,我将不胜感激。

网波

让我们提出您的想法并分解为以下功能:

  1. 创建者应创建文件(例如100个)

  2. 定位器应一次移动1个文件,直到没有更多文件可移动

  3. 创建者可能会在Relocator之前结束,因此它也可以将自己转变为Relocator,两者都必须知道何时完成

因此,我们有2个主要功能:

def create(i):
    # creates files and return outpath
    return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))


def relocate(from, to):
    # moves created files
    shuttil.move(from, to)

现在让我们创建流程:

from multiprocessing import Process, Queue

comm_queue = Queue()

#process that create the files and push the data into the queue
def creator(comm_q):
    for i in range(100):
        comm_q.put(create(i))
    comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker

#the relocator works till it gets an stop flag
def relocator(comm_q):
    data = comm_q.get()
    while data != "STOP_FLAG":
        if data:
            relocate(data, to_path_you_may_want)
        data = comm_q.get()

creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()

这样,我们现在有一个创造者和重新定位,但是,现在,让我们说,我们要的造物主开始搬迁时,创造就业机会是由它来完成,我们可以只使用重新定位,但我们需要推动一个更"STOP_FLAG"因为我们会有2个进程重定位

def creator(comm_q):
    for i in range(100):
        comm_q.put(create(i))
    for _ in range(2):
        comm_q.put("STOP_FLAG")
    relocator(comm_q)

假设我们现在想要任意数量的重定位器进程,我们应该稍微调整一下代码来处理此问题,我们需要该creator方法知道有多少个标志来通知其他进程何时停止,我们产生的代码看起来像这个:

from multiprocessing import Process, Queue, cpu_count

comm_queue = Queue()

#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
    for i in range(100):
        comm_q.put(create(i))
    for _ in range(number_of_subprocesses + 1): # we need to count ourselves
        comm_q.put("STOP_FLAG")
    relocator(comm_q)

#the relocator works till it gets an stop flag
def relocator(comm_q):
    data = comm_q.get()
    while data != "STOP_FLAG":
        if data:
            relocate(data, to_path_you_may_want)
        data = comm_q.get()

num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
    rp.start()

然后,您将需要等待它们完成:

creator_process.join()
for rp in relocators:
    rp.join()

您可能需要查看multiprocessing.Queue文档

专门针对该get方法(默认情况下为阻塞调用)

从队列中删除并返回一个项目。如果可选的args块为True(默认值)且超时为None(默认值),则必要时进行阻塞,直到有可用项为止。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章