我有一个python 3.9
使用Pool
和ThreadPool
from的代码multiprocessing.pool
。目的是让2
Pool
s,每个独立产卵3
ThreadPools
。换句话说,我希望2*3 = 6
线程并行运行。
但是,下面的最小工作示例 (MWE) 代码的输出仅导致3
不同的线程 ID。
我的问题:为什么它会这样,我该如何合理地解决这个问题?
此外,如果这样的N_POOL * N_THREADPOOL
策略看起来不好,欢迎提出建议。实际任务是受 I/O 限制的(网络下载,然后是轻量级预处理)。我对并行性比较陌生。
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
from threading import get_ident
import random
from time import sleep
from functools import partial
# Params
N = 12
N_CPU = 2
N_THREAD = 3
# Just for CPU numbering
CPU_QUEUE = Queue(N_CPU)
for i in range(1, 1 + N_CPU):
CPU_QUEUE.put(i)
def split_list_to_pools(ls_data, n_pools):
"""Split data into pools as lists of approx. equal lengths."""
n_each = int((len(ls_data) - 1) / n_pools) + 1
return [ls_data[n_each * i:n_each * (i + 1)] for i in range(n_pools)]
def process_threadpool(data, CPU_NO=-1):
"""Process incoming data one-by-one"""
sleep(3 + random.random())
print(f"Threadpool id: {get_ident()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")
def process_pool(ls_data):
"""Process a list of data."""
# Get initial pool status
CPU_NO = CPU_QUEUE.get()
print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")
with ThreadPool(N_THREAD) as threadpool:
for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
pass
if __name__ == '__main__':
# given data
ls_data = list(range(N))
# split data to pools
ls_ls_data = split_list_to_pools(ls_data, N_CPU)
print(f"data rearranged for pool: {ls_ls_data}")
# process in parallel
with Pool(N_CPU) as pool:
for _ in pool.imap_unordered(process_pool, ls_ls_data):
pass
print("Program Ended!")
仅存在 3 个不同的线程 ID,而不是预期的 6 个。
$ python so.py
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 2
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 8
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 6
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 0
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 7
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 1
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 3
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 10
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 4
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 9
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 5
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 11
Program Ended!
编辑:代码在 debian 11 下运行
您没有指定您正在运行的平台,但我必须假设它是使用fork创建新进程(例如 Linux)的平台,否则我不相信您的代码会正常工作,因为在spawn池中的每个进程都会正在创建自己的全局副本,CPU_QUEUE
因此每个人都将获得队列中的第一项并相信它是 CPU id 1。
因此,我对代码进行了两项更改:
CPU_QUEUE
通过使用池初始化程序使用单个队列实例为池中的每个进程初始化全局变量,使代码在平台之间更具可移植性。time.sleep
在函数开始时引入了一个调用,process_pool
以使池中的每个进程都有机会处理其中一个提交的任务。如果没有这个,理论上池中的一个进程可以处理所有提交的任务,这只会降低这种可能性。当我在 Linux 下运行代码时,我基本上看到了你所看到的。但是,当我在 Windows 下运行它时,由于上述更改,我现在可以这样做,我看到:
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [6, 7, 8, 9, 10, 11]
Pool CPU_NO: 2, data: [0, 1, 2, 3, 4, 5]
Threadpool id: 16924 CPU_NO: 1 / 2, data: 8
Threadpool id: 15260 CPU_NO: 1 / 2, data: 6
Threadpool id: 19800 CPU_NO: 2 / 2, data: 1
Threadpool id: 7580 CPU_NO: 2 / 2, data: 2
Threadpool id: 20368 CPU_NO: 1 / 2, data: 7
Threadpool id: 18736 CPU_NO: 2 / 2, data: 0
Threadpool id: 19800 CPU_NO: 2 / 2, data: 3
Threadpool id: 16924 CPU_NO: 1 / 2, data: 9
Threadpool id: 7580 CPU_NO: 2 / 2, data: 4
Threadpool id: 15260 CPU_NO: 1 / 2, data: 10
Threadpool id: 18736 CPU_NO: 2 / 2, data: 5
Threadpool id: 20368 CPU_NO: 1 / 2, data: 11
Program Ended!
这是您期望看到的。我只能得出结论,在 Linux 下threading.get_ident
只返回一个进程内的唯一值。但是,如果您使用_thread.get_native_id()
我已将其合并到下面的源代码中的相反,那似乎确实给出了 6 个唯一值(如所希望的那样):
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 81 CPU_NO: 2 / 2, data: 7
Threadpool id: 83 CPU_NO: 2 / 2, data: 8
Threadpool id: 78 CPU_NO: 1 / 2, data: 0
Threadpool id: 79 CPU_NO: 2 / 2, data: 6
Threadpool id: 80 CPU_NO: 1 / 2, data: 1
Threadpool id: 82 CPU_NO: 1 / 2, data: 2
Threadpool id: 78 CPU_NO: 1 / 2, data: 3
Threadpool id: 83 CPU_NO: 2 / 2, data: 10
Threadpool id: 81 CPU_NO: 2 / 2, data: 9
Threadpool id: 79 CPU_NO: 2 / 2, data: 11
Threadpool id: 80 CPU_NO: 1 / 2, data: 4
Threadpool id: 82 CPU_NO: 1 / 2, data: 5
Program Ended!
修改后的来源
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
#from threading import get_ident
from threading import get_native_id
import random
from time import sleep
from functools import partial
# Params
N = 12
N_CPU = 2
N_THREAD = 3
def init_pool_processes(the_queue):
global CPU_QUEUE
CPU_QUEUE = the_queue
def split_list_to_pools(ls_data, n_pools):
"""Split data into pools as lists of approx. equal lengths."""
n_each = int((len(ls_data) - 1) / n_pools) + 1
return [ls_data[n_each * i:n_each * (i + 1)] for i in range(n_pools)]
def process_threadpool(data, CPU_NO=-1):
"""Process incoming data one-by-one"""
sleep(3 + random.random())
print(f"Threadpool id: {get_native_id()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")
def process_pool(ls_data):
"""Process a list of data."""
# Get initial pool status
sleep(.2)
CPU_NO = CPU_QUEUE.get()
print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")
with ThreadPool(N_THREAD) as threadpool:
for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
pass
if __name__ == '__main__':
# Just for CPU numbering
CPU_QUEUE = Queue(N_CPU)
for i in range(1, 1 + N_CPU):
CPU_QUEUE.put(i)
# given data
ls_data = list(range(N))
# split data to pools
ls_ls_data = split_list_to_pools(ls_data, N_CPU)
print(f"data rearranged for pool: {ls_ls_data}")
# process in parallel
with Pool(N_CPU, initializer=init_pool_processes, initargs=(CPU_QUEUE,)) as pool:
for _ in pool.imap_unordered(process_pool, ls_ls_data):
pass
print("Program Ended!")
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句