如何加快同时读取和写入多处理队列的速度?

杰瑞德(Jared)

tl; dr-有没有办法提高同时读取和写入多处理队列的速度?

我有一个处理审核数据的应用程序。可以将其视为syslog中继。它接收数据,对其进行解析,然后继续发送事件。事件发生率可能很高-我正在每秒拍摄15,000个事件。

in_queue = multiprocessing.Queue()

out_queue = multiprocessing.Queue()

  • ReaderProc -单PROC,插槽读卡器,接收数据,并将其放在in_queue使用in_queue.put()
  • ParserProcs -多种特效,使用in_queue.get()获得的数据,处理数据,然后放入成品结果来out_queue使用out_queue.put()
  • WriterProc-单个proc,通过TCP套接字连接读取out_queue使用out_queue.get()并发送数据

我使用队列运行测试-我可以将事件放置拉入25,000 EPS的队列中。当多个解析过程(4)在将数据写入队列时将其从队列中拉出时,就会出现速度下降的情况。税率下降到低于10,000 EPS。我猜潜在的管道,锁等是造成延迟的原因。

我阅读了有关管道的内容,看起来它仅支持2个端点。我需要将CPU密集型分析分叉到多个proc。多处理内存共享等替代方法能否取得更好的结果?如何从队列中获得更好的同步.put().get()操作?

损害

考虑到您的性能需求,我认为最好使用ZeroMQRabbitMQ这样的第三方消息代理我在这里找到了一个比较多重的基准(尽管它与您的用例并不完全匹配)。性能上的差异是巨大的:

multiprocesing.Queue结果

1
2
3

python2 ./multiproc_with_queue.py
Duration: 164.182257891
Messages Per Second: 60907.9210414

0mq结果

1
2
3

python2 ./multiproc_with_zeromq.py
Duration: 23.3490710258
Messages Per Second: 428282.563744

我参加了这两个测试,并提供了更复杂的工作负载,因为它的好处之一multiprocessing.Queue是它可以为您处理序列化。这是新的脚本:

mult_queue.py

import sys
import time
from  multiprocessing import Process, Queue

def worker(q):
    for task_nbr in range(1000000):
        message = q.get()
    sys.exit(1)

def main():
    send_q = Queue()
    Process(target=worker, args=(send_q,)).start()
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        send_q.put(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

multi_zmq.py

import sys
import zmq
from  multiprocessing import Process
import time
import json
import cPickle as pickle

def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    for task_nbr in range(1000000):
        message = work_receiver.recv_pyobj()

    sys.exit(1)

def main():
    Process(target=worker, args=()).start()
    context = zmq.Context()
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        ventilator_send.send_pyobj(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

输出:

dan@dan:~$ ./mult_zmq.py 
Duration: 14.0204648972
Messages Per Second: 71324.3110935
dan@dan:~$ ./mult_queue.py 
Duration: 27.2135331631
Messages Per Second: 36746.4229657

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章