我有一个尝试在其上运行pytest功能测试的任务队列处理服务。在“生产”中运行它时,我从命令行启动它,例如python main.py
。
我不知道如何从pytest启动此任务服务以对其进行功能测试。如何在pytest中启动服务,以便随后可以向其中添加作业,并在完成后查看该作业是否得到处理并添加到数据库中?
def main():
store = "jobs"
worker_id = 1
# Process tasks
task_processing[store] = multiprocessing.Process(
target=process_tasks, args=(store, worker_id)
)
nanopub_processing[store].start()
if __name__ == "__main__":
main()
只要确保您main
正确访问该函数即可:
from main import main
def test_main():
main()
...
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句