芹菜不适用于全局变量

ren
from celery import Celery

app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')

a_num = 0

@app.task
def addone():
    global a_num
    a_num = a_num + 1
    return a_num

这是我用来测试芹菜的代码。我希望每次使用addone()时,返回值都应该增加。但是为什么总是1?

结果

python
>> from tasks import addone
>> r = addone.delay()
>> r.get()
   1
>> r = addone.delay()
>> r.get()
   1
>> r = addone.delay()
>> r.get()
   1
路易

默认情况下,启动工作程序时,Celery以并发4启动它,这意味着它已启动4个进程来处理任务请求。(加上一个控制其他进程的进程。)我不知道该使用哪种算法将任务请求分配给为工作人员启动的进程,但是最终,如果执行addone.delay().get()足够,您将看到该数字大于1。发生的事情是每个进程(不是每个任务)都有自己的副本a_num当我在这里尝试时,我第五次执行addone.delay().get()return 2

您可以通过使用单个进程来启动您的工作程序来处理请求,从而强制每次增加该数量。(例如celery -A tasks worker -c1)但是,如果您重新启动工作程序,则编号将重置为0。此外,我不会设计仅在将处理请求的进程数强制为1时才起作用的代码。决定多个进程应处理对任务的请求,然后事情中断。(代码中注释中的粗大警告只能起到很大作用。)

最终,这种状态应该在缓存(例如Redis)或用作缓存的数据库中共享,这将适用于您问题中的代码。

但是,您在评论中写道:

让我们看看我想使用任务发送一些东西。我不想共享每次任务,而是共享一个全局连接。

将连接存储在缓存中将不起作用。我强烈建议让Celery开始的每个进程都使用其自己的连接,而不是尝试在各个进程之间共享它。不需要随每个新任务请求重新打开连接。每个进程将其打开一次,然后该进程处理的每个任务请求将重用连接。

在许多情况下,尝试在进程之间共享相同的连接(例如,通过通过共享虚拟内存fork)将无法正常工作。连接通常带有状态(例如,数据库连接是否处于自动提交模式)。如果代码的两部分期望连接处于不同的状态,则代码将运行不一致。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章