假设我有以下代码:
from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
def func(x,y):
return y/x
def main(y, xmin,xmax, dx):
x = arange(xmin,xmax,dx)
output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
return x, asarray(output)
def demo():
x,z = main(2.,1.,30.,.1)
plt.plot(x,z, label='All values')
plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
plt.show()
demo()
我只想计算输出,直到输出>给定数(可以假定输出元素随x的增加而单调减少),然后停止(不计算x的所有值然后进行排序,这对我而言效率不高)。有什么方法可以使用并行,延迟或任何其他多处理功能来做到这一点?
没有output > a given number
具体说明,所以我只做了一个。经过测试,我必须扭转条件才能正确操作output < a given number
。
我将使用一个池,使用回调函数启动进程以检查停止条件,然后在准备好时终止池。但这会导致争用条件,从而导致不允许运行的进程省略结果。我认为此方法对您的代码的修改最少,并且非常易于阅读。不能保证列表的顺序。
优点:开销很少
缺点:可能缺少结果。
方法1)
from scipy import *
import multiprocessing
import matplotlib.pyplot as plt
def stop_condition_callback(ret):
output.append(ret)
if ret < stop_condition:
worker_pool.terminate()
def func(x, y, ):
return y / x
def main(y, xmin, xmax, dx):
x = arange(xmin, xmax, dx)
print("Number of calculations: %d" % (len(x)))
# add calculations to the pool
for i in x:
worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)
# wait for the pool to finish/terminate
worker_pool.close()
worker_pool.join()
print("Number of results: %d" % (len(output)))
return x, asarray(output)
def demo():
x, z_list = main(2., 1., 30., .1)
plt.plot(z_list, label='desired range')
plt.show()
output = []
stop_condition = 0.1
worker_pool = multiprocessing.Pool()
demo()
此方法有更多开销,但将允许已开始的过程完成。方法2)
from scipy import *
import multiprocessing
import matplotlib.pyplot as plt
def stop_condition_callback(ret):
if ret is not None:
if ret < stop_condition:
worker_stop.value = 1
else:
output.append(ret)
def func(x, y, ):
if worker_stop.value != 0:
return None
return y / x
def main(y, xmin, xmax, dx):
x = arange(xmin, xmax, dx)
print("Number of calculations: %d" % (len(x)))
# add calculations to the pool
for i in x:
worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)
# wait for the pool to finish/terminate
worker_pool.close()
worker_pool.join()
print("Number of results: %d" % (len(output)))
return x, asarray(output)
def demo():
x, z_list = main(2., 1., 30., .1)
plt.plot(z_list, label='desired range')
plt.show()
output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1
worker_pool = multiprocessing.Pool()
demo()
方法3)优点:不会遗漏任何结果
缺点:此步骤超出了您通常的操作范围。
采取方法1并添加
def stopPoolButLetRunningTaskFinish(pool):
# Pool() shutdown new task from being started, by emptying the query all worker processes draw from
while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
pool._inqueue._reader.recv()
# Send sentinels to all worker processes
for a in range(len(pool._pool)):
pool._inqueue.put(None)
然后改变 stop_condition_callback
def stop_condition_callback(ret):
if ret[1] < stop_condition:
#worker_pool.terminate()
stopPoolButLetRunningTaskFinish(worker_pool)
else:
output.append(ret)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句