我的代码看起来像这样
def myfunc(param):
# expensive stuff that takes 2-3h
mylist = [...]
client = Client(...)
mgr = DeploymentMgr()
# ... setup stateful set ...
futures = client.map(myfunc, mylist, ..., resources={mgr.hash.upper(): 1})
client.gather(futures)
我在Kubernetes集群上运行了dask。在程序开始时,我创建一个有状态集合。这是通过完成的kubernetes.client.AppsV1Api()
。然后,我最多等待30分钟,直到我请求的所有工人都可用。对于此示例,假设我请求10个工作人员,但30分钟后,只有7个工作人员可用。最后,我调用client.map()
并将函数和列表传递给它。此列表包含10个元素。但是,dask将仅使用7个工人来处理此列表!即使几分钟后剩下的3个工作线程可用,即使第一个元素的处理都没有完成,dask也不会为其分配任何列表元素。
我该如何改变行为方式?有没有一种方法可以告诉dask(或dask的调度程序)定期检查新到的工人并更“正确地”分配工作?还是可以手动影响这些列表元素的分布?
谢谢。
一旦更好地了解任务将花费多长时间,Dask将平衡负载。您可以使用配置值来估算任务长度
distributed:
scheduler:
default-task-durations:
myfunc: 1hr
或者,一旦Dask完成了其中一项任务,它将知道将来如何围绕该任务做出决策。
我相信这在GitHub问题追踪器上也曾出现过几次。您可能希望通过https://github.com/dask/distributed/issues搜索以获取更多信息。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句