当迭代次数足够高时,Dask 多处理失败,循环并行,包括调用 MongoDB

米塔尔

我正在尝试使用 Dask 多处理在 Python for 循环中并行运行一种模拟。当迭代次数相当少时,并行化工作正常,但当迭代次数增加时会失败。该问题发生在 Win7(4 核,10 Gb RAM)、Win10(8 核,8 Gb RAM)和运行 Windows Server 2016(16 核,32 Gb RAM)的 Azure VM 上。最慢的 Win7 可以在失败之前通过大多数迭代。可以通过在进程中包含的每个函数的末尾添加足够长的睡眠时间来缓解该问题,但所需的睡眠时间会导致性能非常低,类似于顺序运行。

我希望有人能在这里帮助我。在此先感谢您的评论和回答!

以下简单代码包含 for 循环的一些阶段并重复错误。

import json
import pandas as pd
from pymongo import MongoClient

# Create random DataFrame
df = pd.DataFrame(np.random.randint(0,100,size=(100,11)), columns=list('ABCDEFGHIJK'))

# Save to Mongo
client = MongoClient()
db = client.errordemo
res = db.errordemo.insert_many(json.loads(df.to_json(orient='records')))
db.client.close()


class ToBeRunParallel:

def __init__(self):
    pass

def functionToBeRunParallel(self, i):

    # Read data from mongo
    with MongoClient() as client:
        db = client.errordemo
        dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))

    # Randomize data
    dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)

    # Sum rows
    dataSum = dataRand.sum(axis=1)

    # Select randomly one of the resulting values and return
    return dataSum.sample().values[0]

在控制台或 Jupyter 中调用函数 functionToBeRunParallel(都失败)。'errordemo' 是一个包含类 ToBeRunParallel 的本地模块。在 Azure VM 上运行时,代码成功执行 500 次循环,失败执行 5,000 次。

import errordemo
from dask import delayed, compute, multiprocessing

# Determine how many times to loop
rng = range(15000)

# Define empty result lists
resList = []

# Create instance
err = errordemo.ToBeRunParallel()

# Loop in parallel using Dask
for i in rng:
    sampleValue = delayed(err.functionToBeRunParallel)(i)
    resList.append(sampleValue)

# Compute in parallel 
result = compute(*resList, get=multiprocessing.get)

Jupyter 中的错误堆栈如下。

---------------------------------------------------------------------------
AutoReconnect                             Traceback (most recent call last)
<ipython-input-3-9f535dd4c621> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', '# Determine how many times to loop\nrng = range(50000)\n\n# Define empty result lists\nresList = []\n\n# Create instance\nerr = errordemo.ToBeRunParallel()\n\n# Loop in parallel using Dask\nfor i in rng:\n    sampleValue = delayed(err.functionToBeRunParallel)(i)\n    resList.append(sampleValue)\n    \n# Compute in parallel \nresult = compute(*resList, get=dask.multiprocessing.get)')

C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2113             magic_arg_s = self.var_expand(line, stack_depth)
   2114             with self.builtin_trap:
-> 2115                 result = fn(magic_arg_s, cell)
   2116             return result
   2117 

<decorator-gen-60> in time(self, line, cell, local_ns)

C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):

C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
   1178         else:
   1179             st = clock2()
-> 1180             exec(code, glob, local_ns)
   1181             end = clock2()
   1182             out = None

<timed exec> in <module>()

C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    200     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    201     keys = [var._keys() for var in variables]
--> 202     results = get(dsk, keys, **kwargs)
    203 
    204     results_iter = iter(results)

C:\ProgramData\Anaconda3\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
     85         result = get_async(pool.apply_async, len(pool._pool), dsk3, keys,
     86                            get_id=_process_get_id,
---> 87                            dumps=dumps, loads=loads, **kwargs)
     88     finally:
     89         if cleanup:

C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
    498                     _execute_task(task, data)  # Re-execute locally
    499                 else:
--> 500                     raise(remote_exception(res, tb))
    501             state['cache'][key] = res
    502             finish_task(dsk, key, state, results, keyorder.get)

AutoReconnect: localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

Traceback
---------
  File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 266, in execute_task
    result = _execute_task(task, data)
  File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 247, in _execute_task
    return func(*args2)
  File "C:\Git_repository\footie\Pipeline\errordemo.py", line 20, in functionToBeRunParallel
    dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
  File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\frame.py", line 981, in from_records
    first_row = next(data)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1090, in next
    if len(self.__data) or self._refresh():
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1012, in _refresh
    self.__read_concern))
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 850, in __send_message
    **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 844, in _send_message_with_response
    exhaust)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 855, in _reset_on_error
    return func(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 99, in send_message_with_response
    with self.get_socket(all_credentials, exhaust) as sock_info:
  File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
    return next(self.gen)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 163, in get_socket
    with self.pool.get_socket(all_credentials, checkout) as sock_info:
  File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
    return next(self.gen)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 582, in get_socket
    sock_info = self._get_socket_no_auth()
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 618, in _get_socket_no_auth
    sock_info, from_pool = self.connect(), False
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 555, in connect
    _raise_connection_failure(self.address, error)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 65, in _raise_connection_failure
    raise AutoReconnect(msg)

更新:

这篇文章之后,我创建了一个装饰器来捕获 AutoReconnect 异常,如下所示。与 MongoClient 的参数一起循环工作,但它仍然很慢,应该花费的时间加倍。(在 Azure VM 上计时):

500 次迭代:3.74s
50,000 次迭代:12min 12s

def safe_mongocall(call):
    def _safe_mongocall(*args, **kwargs):
        for i in range(5):
            try:
                return call(*args, **kwargs)
            except errors.AutoReconnect:
                sleep(random.random() / 100)
        print('Error: Failed operation!')
    return _safe_mongocall

@safe_mongocall
def functionToBeRunParallel(self, i):

    # Read data from mongo
    with MongoClient(connect=False, maxPoolSize=None, maxIdleTimeMS=100) as client:
         db = client.errordemo
         dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))

    # Randomize data
    dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)

    # Sum rows
    dataSum = dataRand.sum(axis=1)

    # Select randomly one of the resulting values and return
    return dataSum.sample().values[0]
米塔尔

实际问题是 TCP/IP 端口耗尽,因此解决方案是避免耗尽。按照Microsoft 的文章,我将以下注册表项和值添加到 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters:

MaxUserPort:65534
TcpTimedWaitDelay:30

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章