所以,让我们说,我们有一个同步方法,如下所示:
def sync_method(param1, param2):
# Complex method logic
return "Completed"
我想run_in_executor
在当前事件循环下的不同异步方法中运行上述方法。一个例子如下:
async def run_sync_in_executor(param1, param2, pool=None):
loop = asyncio.get_event_loop()
value = loop.run_in_executor(pool, sync_method, param1, param2)
# Some further changes to the variable `value`
return value
现在,我想在循环遍历参数列表的同时运行上述方法,并最终修改最终输出。一种方法,我认为可行但不使用asyncio.gather
:
def main():
params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
当我阅读文档并理解时,这不起作用的原因是该方法run_sync_in_executor
正在尝试访问当前事件循环,该循环由gather
. 因为,每个事件循环只能有一个线程,甚至在此之前,第一个循环已经结束,由于gather
以下方法的性质是尝试访问事件循环,这会导致错误。
作为解决方案,我想到了 using ThreadPoolExecutor
,它可能会根据每个方法在执行时可以使用的num_workers
子句创建线程数pool
。我期待这样的事情:
with ThreadPoolExecutor(num_workers=8) as executor:
for param in params_list:
future = executor.submit(run_sync_in_executor, param[0], param[1], executor)
print(future.result())
但是上面的方法是行不通的。如果有人能建议我实现预期目标的最佳方法是什么,那就太好了?
你的代码中有几个错误:你没有 awaited run_in_executor
,main
应该是 async 函数。工作解决方案:
import asyncio
import time
def sync_method(param1, param2):
"""Some sync function"""
time.sleep(5)
return param1 + param2 + 10000
async def ticker():
"""Just to show that sync method does not block async loop"""
while True:
await asyncio.sleep(1)
print("Working...")
async def run_sync_in_executor(param1, param2, pool=None):
"""Wrapper around run in executor"""
loop = asyncio.get_event_loop()
# run_in_executor should be awaited, otherwise run_in_executor
# just returns coroutine (not its result!)
value = await loop.run_in_executor(pool, sync_method, param1, param2)
return value
async def amain():
"""Main should be async function !"""
params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
asyncio.create_task(ticker()) # runs in parallel, never awaited!
output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
print(output)
if __name__ == '__main__':
asyncio.run(amain())
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句