我正在研究任务处理解决方案。任务源是具有数千条记录的SQLlite DB。每个任务都是http请求,因此可能需要几秒钟才能完成。我决定使用asyncio进行处理。示例基于小的任务队列,因此按原样使用它们会占用大量内存,并且需要花费大量时间来填充任务列表。在文档中,看起来像这样
tasks = []
for i in range(1,10):
task = asyncio.create_task(worker(i))
tasks.append(task)
await asyncio.gather(tasks)
我想做的是从数据库中逐一读取任务,并处理它们,以保持并发限制在MAX_CONCURRENT之内。因此,这是我的肮脏技巧,但是我相信有一个更优雅的解决方案。
UPD早上一个小时值得晚上两个:)但是无论如何,我认为信号量的使用会更好,但是我不确定如何在循环中使用它
import random
import asyncio
import aiohttp
from aiohttp import ClientSession
from sqlitedict import SqliteDict
async def testWorker (id,url, db):
#placeholder url processing
await asyncio.sleep(random.randint(1,5))
async def main():
MAX_CONCURRENT = 5
db = SqliteDict('./taskdb.sqlite', autocommit=True)
tasks = set()
it = db.iteritems()
while True:
try:
id, url = next(it)
if (len(tasks) < MAX_CONCURRENT):
task = asyncio.create_task(testTask(id,url, db))
tasks.add(task)
else:
done, pending = await asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)
tasks = pending
except StopIteration:
break
done, pending = await asyncio.wait(tasks)
if __name__ == "__main__":
asyncio.run(main())
如您所发现,最惯用的方法是使用信号量。在这种情况下,您的循环将变得更加简单,因为它不再需要执行MAX_CONCURRENT
,而这一切都由信号量完成:
async def testWorker(id,url, db, semaphore):
# async with ensures that no more than MAX_CONCURRENT
# workers enter the block at the same time
async with semaphore:
await asyncio.sleep(random.randint(1,5))
async def main():
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
db = SqliteDict('./taskdb.sqlite', autocommit=True)
async with ClientSession() as session:
coros = [testTask(id, url, db, semaphore)
for id, url in db.iteritems()]
results = await asyncio.gather(*tasks)
另一种选择是启动固定数量的工人并通过队列与他们进行通信。这稍微复杂一点,但是对于任务数量可能巨大或无限制的情况,这是一个不错的选择。
async def testWorker(db, queue):
while True:
id, url = queue.get()
await asyncio.sleep(random.randint(1,5))
queue.task_done()
async def main():
queue = asyncio.Queue()
workers = [asyncio.create_task(db, queue)]
db = SqliteDict('./taskdb.sqlite', autocommit=True)
async with ClientSession() as session:
for id, url in db.iteritems():
await queue.put((id, url))
await queue.join()
# cancel the workers, which are now sitting idly
for w in workers:
w.cancel()
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句