Django Celery定期任务示例

承租人

我需要一个最小的示例来执行定期任务(每5分钟运行一次功能,或在12:00:00运行某项功能,等等)。

在我的myapp/tasks.py,我有,

from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery import task


@periodic_task(run_every=(crontab(hour="*", minute=1)), name="run_every_1_minutes", ignore_result=True)
def return_5():
    return 5


@task
def test():
    return "test"

当我运行芹菜工作者时,它确实显示了任务(如下所示),但是没有返回任何值(在终端或花中)。

[tasks]
  . mathematica.core.tasks.test
  . run_every_1_minutes

请提供一个最小的示例或提示以达到预期的效果。

背景:

我有一个config/celery.py包含以下内容的:

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")

app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

在我的config/__init__.py身上

from .celery import app as celery_app

__all__ = ['celery_app']

我在下面添加了一个类似下面的功能 myapp/tasks.py

from celery import task

@task
def test():
    return "test"

当我test.delay()从shell运行时,它成功运行,并且还在花中显示了任务信息

永远不要走

要运行定期任务,您还应该运行芹菜节拍您可以使用以下命令运行它:

celery -A proj beat

或者,如果您使用的是一名工人:

celery -A proj worker -B

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章