创建带有嵌套承诺的承诺队列

杰弗雷尔

我正在实现一个批量获取和处理请求的查询引擎。我正在使用异步/等待。

现在,执行流程按层次结构运行,在该层次结构中有包含查询的项目列表,并且每个查询都有一个访存。

我想做的是将项目捆绑在n个组中,因此,即使每个项目中都有m个带有内部提取的查询,也只能同时运行n * m个请求;特别是只有一个请求会同时发送到同一域。

问题是,当我等待项目执行时(在外部级别,将项目分组并在一段时间内将停止迭代,直到promise分解),当内部查询的执行被推迟时,这些promise就会解决,因为内部等待提取。

这导致我的排队时间只是暂时停止,而不是等待内部承诺解决。

这是外部排队类:

class AsyncItemQueue {
  constructor(items, concurrency) {
    this.items = items;
    this.concurrency = concurrency;
  }

  run = async () => {
    let itemPromises = [];

    const bundles = Math.ceil(this.items.length / this.concurrency);
    let currentBundle = 0;

    while (currentBundle < bundles) {
      console.log(`<--------- FETCHING ITEM BUNDLE ${currentBundle} OF ${bundles} --------->`);

      const lowerRange = currentBundle * this.concurrency;
      const upperRange = (currentBundle + 1) * this.concurrency;

      itemPromises.push(
        this.items.slice(lowerRange, upperRange).map(item => item.run())
      );

      await Promise.all(itemPromises);

      currentBundle++;
    }
  };
}


export default AsyncItemQueue;

这是队列正在运行的简单项目类。我省略了多余的代码。

class Item {

// ...

  run = async () => {
    console.log('Item RUN', this, this.name);

    return await Promise.all(this.queries.map(query => {
      const itemPromise = query.run(this.name);
      return itemPromise;

    }));
  }
}

这是项目内包含的查询。每个项目都有一个查询列表。再次,一些代码被删除,因为它并不有趣。

class Query {

// ...


  run = async (item) => {
    // Step 1: If requisites, await.
    if (this.requires) {
      await this.savedData[this.requires];
    }

    // Step 2: Resolve URL.
    this.resolveUrl(item);

    // Step 3: If provides, create promise in savedData.
    const fetchPromise = this.fetch();

    if (this.saveData) {
      this.saveData.forEach(sd => (this.savedData[sd] = fetchPromise));
    }


    // Step 4: Fetch.
    const document = await fetchPromise;

    // ...
  }
}

中的whileAsyncItemQueue正确停止,但是直到执行流程到达中的步骤3为止Query一旦到达标准抓取功能的包装程序即抓取操作,外部答应就会解决,而我最终将同时执行所有请求。

我怀疑问题出在Query类中,但是我对如何避免外部承诺的解决感到困惑。

我试图使Queryrun函数返回文档,以防万一,但无济于事。

任何想法或指导将不胜感激。我将尝试回答有关代码的任何问题,或者在需要时提供更多问题。

谢谢!

PS:这是一个带有工作示例的codeandbox:https://codesandbox.io/s/goofy-tesla-iwzem

正如您在控制台出口中看到的那样,while循环在获取完成之前进行迭代,并且它们都在同一时间执行。

杰弗雷尔

我已经解决了

问题出在AsyncItemQueue班上。特别:

itemPromises.push(
  this.items.slice(lowerRange, upperRange).map(item => item.run())
);

那就是将承诺列表推入列表,因此,后来:

await Promise.all(itemPromises);

在该列表中未找到任何等待的诺言(因为它包含更多的列表,内含诺言)。

解决方案是将代码更改为:

await Promise.all(this.items.slice(lowerRange, upperRange).map(item => item.run()));

现在它运行良好。项目以n的批次运行,并且新的批次将在之前的批次完成之前运行。

我不确定这是否会帮助我,但我会留在这里,以防万一有人发现类似问题。谢谢您的帮助。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章