pytest功能测试多处理任务队列服务

威廉

我有一个尝试在其上运行pytest功能测试的任务队列处理服务。在“生产”中运行它时,我从命令行启动它,例如python main.py

我不知道如何从pytest启动此任务服务以对其进行功能测试。如何在pytest中启动服务,以便随后可以向其中添加作业,并在完成后查看该作业是否得到处理并添加到数据库中?

def main():
    store = "jobs"
    worker_id = 1
    # Process tasks
    task_processing[store] = multiprocessing.Process(
            target=process_tasks, args=(store, worker_id)
        )
    nanopub_processing[store].start()


if __name__ == "__main__":
    main()

马蹄铁

只要确保您main正确访问该函数即可:

from main import main

def test_main():
    main()
    ...

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章