寻找更优雅的任务处理解决方案

蒂姆·拉拉诺夫(Tim Rulanov)

我正在研究任务处理解决方案。任务源是具有数千条记录的SQLlite DB。每个任务都是http请求,因此可能需要几秒钟才能完成。我决定使用asyncio进行处理。示例基于小的任务队列,因此按原样使用它们会占用大量内存,并且需要花费大量时间来填充任务列表。在文档中,看起来像这样


    tasks = []
    for i in range(1,10):
            task = asyncio.create_task(worker(i))
            tasks.append(task)


     await asyncio.gather(tasks)

我想做的是从数据库中逐一读取任务,并处理它们,以保持并发限制在MAX_CONCURRENT之内。因此,这是我的肮脏技巧,但是我相信有一个更优雅的解决方案。

UPD早上一个小时值得晚上两个:)但是无论如何,我认为信号量的使用会更好,但是我不确定如何在循环中使用它

import random
import asyncio
import aiohttp
from aiohttp import ClientSession

from sqlitedict import SqliteDict

async def testWorker (id,url, db):
    #placeholder url processing
    await asyncio.sleep(random.randint(1,5))


async def main():

    MAX_CONCURRENT = 5
    db = SqliteDict('./taskdb.sqlite', autocommit=True)

    tasks = set()
    it = db.iteritems()
    while True:
        try:
            id, url = next(it)

            if (len(tasks) < MAX_CONCURRENT):
                task = asyncio.create_task(testTask(id,url, db))
                tasks.add(task)
            else:
                done, pending = await asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)
                tasks = pending

        except StopIteration:              
            break

    done, pending = await asyncio.wait(tasks)

if __name__ == "__main__":
    asyncio.run(main())

用户名

如您所发现,最惯用的方法是使用信号量在这种情况下,您的循环将变得更加简单,因为它不再需要执行MAX_CONCURRENT,而这一切都由信号量完成:

async def testWorker(id,url, db, semaphore):
    # async with ensures that no more than MAX_CONCURRENT
    # workers enter the block at the same time
    async with semaphore:
        await asyncio.sleep(random.randint(1,5))

async def main():
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    db = SqliteDict('./taskdb.sqlite', autocommit=True)

    async with ClientSession() as session:
        coros = [testTask(id, url, db, semaphore)
                 for id, url in db.iteritems()]
        results = await asyncio.gather(*tasks)

另一种选择是启动固定数量的工人并通过队列与他们进行通信这稍微复杂一点,但是对于任务数量可能巨大或无限制的情况,这是一个不错的选择。

async def testWorker(db, queue):
    while True:
        id, url = queue.get()
        await asyncio.sleep(random.randint(1,5))
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    workers = [asyncio.create_task(db, queue)]
    db = SqliteDict('./taskdb.sqlite', autocommit=True)

    async with ClientSession() as session:
        for id, url in db.iteritems():
            await queue.put((id, url))
        await queue.join()

    # cancel the workers, which are now sitting idly
    for w in workers:
        w.cancel()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

“ for”循环的更优雅的解决方案

图形实现更优雅的解决方案?

ASP.NET Core 2.0 Web API实体框架核心-寻找更优雅的解决方案

:nth-child内容替换的更优雅的解决方案

这个问题有更优雅的解决方案吗?

为我的练习任务寻找更“pythonic”的解决方案

寻找一种更优雅的方式来解决此任务

寻找更优雅的方法来解决这个简单的逻辑任务

使用Jackson进行不对称的名称/属性映射,更优雅的解决方案?

从属性的逗号分隔列表创建数组的更优雅的解决方案?

是否有一种更优雅的解决方案来填充地图中的列表?

针对同一领域的多个OR的更优雅的解决方案

需要更优雅的解决方案以达到均匀的字符串长度

在异步函数中返回承诺-什么是更优雅/更好的实践解决方案?

JQuery 在单击和悬停文本链接时显示 div - 更优雅的解决方案?

有没有更优雅的解决方案来修改这些文件名?

在度中心度networkx值字典中找到最大值的键的更优雅的解决方案?

Java中的自然语言处理解决方案?

任何惯用的生成/预处理解决方案库?

这个重复代码的优雅解决方案?

优雅的解析日期的解决方案

什么是条件比较的优雅解决方案?

寻求优雅的linq解决方案

在设置大量计时器或使用计划的任务队列之间寻找解决方案

我正在寻找有关 Python 中列表的任务的解决方案

寻找 fetch js/css 解决方案

寻找解决方案时间戳PostgreSQL

寻找最佳解决方案

寻找解决方案以缩短IF公式