在 Python 中的多个进程之间共享存储对象的字典

科隆德

我正在编写一个大型脚本,其主要目的是读取许多文件的内容并在字典中存储每个元素的数量。如果字典中不存在该元素,那么我们正在创建某个对象的新实例,然后递增,否则仅递增。由于要处理的每个文件本身都很大,有时我需要处理 100 多个文件,因此我想稍微加快速度并利用 Python 的多处理模块。这是脚本的大大简化版本(我用...隐藏了路径,它不是真正的):

import multiprocessing as mp
from os import listdir
from os.path import join

manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()

class TestClass:
    def __init__(self):
        self._number = 0

    def increment(self):
        self._number += 1

def worker(file):
    f = open(file, 'r')
    for line in f.readlines():
        if line not in dictionary:
            dictionary[line] = TestClass()

        dictionary[line].increment()

def _list_files():
    for f in listdir("..."):
        queue.put(join("...", f))

def pool():
    _list_files()
    _pool = mp.Pool(mp.cpu_count())    

    for i in range(len(queue)):
        _pool.apply(worker, args=(queue.get()))

    _pool.close()
    _pool.join()

pool()
print(dictionary)

问题是脚本崩溃并显示以下消息:

AttributeError: Can't get attribute 'TestClass' on <module '__main__' from '.../multiprocessing_test.py'>  

有什么办法可以让它发挥作用吗?
我不是创建脚本初始版本的人,我只是向其中添加了一些功能。鉴于此,剧本的结构必须保持不变,因为重写它会花费太多时间,那就是TestClassworkerlist_files不能改变它们的结构(除了多道连接所有的东西)

周二

(好像你之前发布过这个问题。)

由于多种原因,您的示例代码不起作用,其中最重要的是...它没有做任何有用的事情:

$ python tst.py
Traceback (most recent call last):
  File "tst.py", line 38, in <module>
    pool()
  File "tst.py", line 29, in pool
    _list_files()
  File "tst.py", line 25, in _list_files
    for f in listdir("..."):
OSError: [Errno 2] No such file or directory: '...'

(发布无法运行的代码不是一种好形式,但提供MCVE个好主意。)所以我修复了这个问题:

index 39014ff..1ac9f4a 100644
--- a/tst.py
+++ b/tst.py
@@ -2,6 +2,8 @@ import multiprocessing as mp
 from os import listdir
 from os.path import join

+DIRPATH = 'inputs'
+
 manager = mp.Manager()
 queue = manager.Queue()
 dictionary = manager.dict()
@@ -22,8 +24,8 @@ def worker(file):
         dictionary[line].increment()

 def _list_files():
-    for f in listdir("..."):
-        queue.put(join("...", f))
+    for f in listdir(DIRPATH):
+        queue.put(join(DIRPATH, f))

 def pool():
     _list_files()

并创建了一个inputs/包含一个示例输入文件目录:

$ ls inputs
one
$ cat inputs/one
1
one
unum

现在这个例子产生:

$ python tst.py
Traceback (most recent call last):
  File "tst.py", line 40, in <module>
    pool()
  File "tst.py", line 34, in pool
    for i in range(len(queue)):
TypeError: object of type 'AutoProxy[Queue]' has no len()

现在,我不会声称这种重写是好的,但我继续将其重写为确实有效的内容:

import multiprocessing as mp
from os import listdir
from os.path import join

DIRPATH = 'inputs'

class TestClass:
    def __repr__(self):
        return str(self._number)

    def __init__(self):
        self._number = 0

    def increment(self):
        self._number += 1

def worker(dictionary, queue):
    while True:
        path = queue.get()
        if path is None:
            return
        f = open(path, 'r')
        for line in f.readlines():
            if line not in dictionary:
                dictionary[line] = TestClass()
            dictionary[line].increment()

def run_pool():
    manager = mp.Manager()
    queue = manager.Queue()
    dictionary = manager.dict()
    nworkers = mp.cpu_count()
    pool = mp.Pool(nworkers)

    for i in range(nworkers):
        pool.apply_async(worker, args=(dictionary, queue))

    for f in listdir(DIRPATH):
        queue.put(join(DIRPATH, f))
    for i in range(nworkers):
        queue.put(None)

    pool.close()
    pool.join()

    return dictionary

def main():
    dictionary = run_pool()
    print(dictionary)

if __name__ == '__main__':
    main()

主要区别是:

  • 我删除了所有全局变量。管理器实例、管理队列和管理字典都是run_pool.

  • 创建工人后,我将文件的名称放入队列中nworker每个 worker 运行一个循环,读取文件名,直到它读取一个None名称,然后返回它的 (None) 结果。

  • 主循环将文件名放入队列,以便工作人员可以在完成每个前一个文件时将文件名从队列中取出。为了通知所有nworkers工作人员退出,主循环将那么多None条目添加到队列中。

  • run_pool 返回最终的(仍然托管的)字典。

当然,我__repr__在您的TestClass对象中添加了一个以便我们可以看到计数。我还通过将main驱动程序移动到一个函数中来确保代码应该在 Windows 上工作,只有在__name__ == '__main__'.

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

是否可以在进程之间传递Python Future对象?

在python中的不同进程之间共享列表

Python在进程之间共享锁

在Python进程之间共享复杂的对象?

python服务器中进程之间的共享列表

在Python进程之间共享内存中的大型数据结构?

在Python中的进程之间共享许多队列

可以在Python进程之间共享set()吗?

在python3中的多进程之间共享python对象

通过Python中的套接字在两个进程之间传递共享内存对象

Python多重处理:RuntimeError:“队列对象仅应通过继承在进程之间共享”

Python如何在多个进程之间共享内存?

Python:在生成的进程之间共享锁

Python在多处理进程之间共享双端队列

在单独的进程之间共享内存中的复杂python对象

在python中的进程之间共享对象

在Python中的进程之间共享列表的正确方法?

在python进程之间共享tkinter窗口对象

如何在Python中有效地在多个线程和进程之间共享数据?

在多个python进程之间共享RabbitMQ通道

Python多处理:在进程之间共享数据

如何在python的进程之间共享数据?

在python中的两个子进程之间共享匿名mmap

python multiprocessing-在进程之间共享类的字典,并将后续从进程反映到共享内存的写入

在Python进程之间共享具有对Tasklet(协程)具有写访问权的嵌套对象?

多个进程之间的多进程同步(Python)

我如何在 Python 中的 2 个不同进程之间共享串行端口

如何使用python sqlite3包在python中的不同进程之间共享:memory:数据库

如何在 Python 3 中的进程之间共享变量?