运行非异步功能,该功能仅在执行程序中构建大型列表?

HTF:

代码中的逻辑是通过(异步)HTTP请求提取数据,然后构建大量词典,其中随机生成其中一个值:

import asyncio
import random
import string
import time

from concurrent.futures import ProcessPoolExecutor
from itertools import cycle

from httpx import AsyncClient

URL = 'http://localhost:8080'
COUNT = 1_000_000


def rand_str(length=10):
    return ''.join(random.choice(string.ascii_uppercase) for i in range(length))


def parser(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def parser_coro(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def run_in_executor(func, pool, *args, **kwargs):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(pool, func, *args, **kwargs)


async def main():
    async with AsyncClient(base_url=URL) as client:
        r = await client.get('/api/alerts/')
        data = r.json()

    # Case 1
    t1 = time.perf_counter()
    parser(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 1 - sync: {t2 - t1:.3f}s')
    
    # Case 2
    t1 = time.perf_counter()
    await parser_coro(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')

    # Case 3
    t1 = time.perf_counter()
    await run_in_executor(parser, None, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 3 - thread executor: {t2 - t1:.3f}s')

    # Case 4
    t1 = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        await run_in_executor(parser, executor, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 4 - process executor: {t2 - t1:.3f}s')


if __name__ == '__main__':
    asyncio.run(main(), debug=True)

测试:

$ python test.py 
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s

题:

我应该parser在执行程序中运行该函数,以便在生成列表时不阻塞主线程,否则在这种情况下将不起作用?在这种情况下,这实际上是受CPU或I / O约束的工作量吗?我猜没有任何IO,但是正在建立列表是一项CPU密集型任务,因此工作负载受CPU约束吗?

user4815162342:

我应该parser在执行程序中运行该函数,以便在生成列表时不阻塞主线程,否则在这种情况下将不起作用?

是的你应该。尽管全局解释器处于锁定状态,但使用单独的线程还是有帮助的,因为Python允许执行在parser不意识到的情况下从解析切换到异步线程因此,使用线程可以防止事件循环被阻塞6秒钟,或者运行该函数需要花费多长时间。

请注意,parser_coroparser体与没有执行程序变体没有什么不同,因为它不等待任何东西。await parser_coro(...)会像没有执行者的调用停止发泄循环parser(...)

在这种情况下,这实际上是受CPU或I / O约束的工作量吗?

我无法评论其余的工作量,但是编写的功能肯定是受CPU限制的。

我可以在ThreadPoolExecutor不阻塞的情况下运行ProcessPoolExecutor吗,或者它必须是a,因为它是受CPU约束的函数?

您可以在中运行它ThreadPoolExecutor只是,如果您有一堆并行运行,它们将共享相同的CPU内核。(但是它们不会阻塞其他协程,因为它们将在事件循环线程中运行。)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章