如何在不同线程中为 asyncio 运行 `loop_in_executor`?

凤凰97

所以,让我们说,我们有一个同步方法,如下所示:

def sync_method(param1, param2):
    # Complex method logic
    return "Completed"

我想run_in_executor在当前事件循环下的不同异步方法中运行上述方法一个例子如下:

async def run_sync_in_executor(param1, param2, pool=None):
    loop = asyncio.get_event_loop()
    value = loop.run_in_executor(pool, sync_method, param1, param2)
    # Some further changes to the variable `value`
    return value

现在,我想在循环遍历参数列表的同时运行上述方法,并最终修改最终输出。一种方法,我认为可行但不使用asyncio.gather

def main():
    params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
    output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])

当我阅读文档并理解时,这不起作用的原因是该方法run_sync_in_executor正在尝试访问当前事件循环,该循环由gather. 因为,每个事件循环只能有一个线程,甚至在此之前,第一个循环已经结束,由于gather以下方法的性质是尝试访问事件循环,这会导致错误。

作为解决方案,我想到了 using ThreadPoolExecutor,它可能会根据每个方法在执行时可以使用num_workers子句创建线程数pool我期待这样的事情:

with ThreadPoolExecutor(num_workers=8) as executor:
    for param in params_list:
        future = executor.submit(run_sync_in_executor, param[0], param[1], executor)
        print(future.result())

但是上面的方法是行不通的。如果有人能建议我实现预期目标的最佳方法是什么,那就太好了?

阿蒂奥姆·科济列夫

你的代码中有几个错误:你没有 awaited run_in_executormain应该是 async 函数。工作解决方案:

import asyncio
import time


def sync_method(param1, param2):
    """Some sync function"""
    time.sleep(5)
    return param1 + param2 + 10000


async def ticker():
    """Just to show that sync method does not block async loop"""
    while True:
        await asyncio.sleep(1)
        print("Working...")


async def run_sync_in_executor(param1, param2, pool=None):
    """Wrapper around run in executor"""
    loop = asyncio.get_event_loop()
    # run_in_executor should be awaited, otherwise run_in_executor
    # just returns coroutine (not its result!)
    value = await loop.run_in_executor(pool, sync_method, param1, param2)
    return value


async def amain():
    """Main should be async function !"""
    params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
    asyncio.create_task(ticker()) # runs in parallel, never awaited!
    output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
    print(output)

if __name__ == '__main__':
    asyncio.run(amain())

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

asyncio的loop.run_in_executor线程安全吗?

我如何在asyncio上异步运行`loop.run_until_complete()`本身?

在不同线程中运行任务

asyncio后台线程:主线程阻塞中的运行功能

如何同时使用asyncio运行无限循环?

在asyncio中测试永久运行的任务

在 asyncio.as_completed 中运行并发任务

如何在asyncio中同时运行任务?

如何在不同的进程中同时运行两个 asyncio 循环?

asyncio.run_in_executor是多线程的吗?

使用asyncio在后台在线程类中运行函数

在没有 asyncio.get_event_loop() 的 run_in_executor 的异步方法中使用线程池

在两个不同的线程中运行asyncio以即时处理操作

如何在Twisted的asyncioreactor之上运行asyncio库代码?

如何在不等待的情况下运行Asyncio任务?

如何使用asyncio在python3中运行并行作业?

asyncio以不同的间隔定期运行两个不同的功能

Python asyncio:在辅助线程上运行subprocess_exec

asyncio是否支持从非主线程运行子进程?

在asyncio事件循环中在主线程上运行无限循环

如何运行不同线程的方法访问变量?

如何在Python中为线程设置asyncio事件循环?

在不同线程中运行协程循环

是文件系统在javascript中的不同线程上运行

多线程:如何在不同的线程中运行不同的功能?

如何在单独的线程中运行 asyncio/websockets 服务器:错误:-> 线程中没有当前事件循环

在来自不同线程的回调中设置asyncio.Future的值

Python 3.5 asyncio在事件循环中从不同线程中的同步代码执行协程

为什么在不同线程中调用asyncio subprocess.communicate会挂起?