首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >celery :收到了类型为“celery_deploy.tasks._stocks”的未注册任务

celery :收到了类型为“celery_deploy.tasks._stocks”的未注册任务
EN

Stack Overflow用户
提问于 2020-11-10 23:55:01
回答 1查看 768关注 0票数 2

当我运行celery时,如下所示:celery -A bookmarks worker -l INFO

它返回:

代码语言:javascript
复制
[2020-11-10 23:14:21,649: ERROR/MainProcess] Received unregistered task of type 'celery_deploy.tasks._stocks'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
KeyError: 'celery_deploy.tasks._stocks'

我在项目书签目录( Django项目settings.py所在的目录)中的celery.py:

代码语言:javascript
复制
from __future__ import  absolute_import, unicode_literals
import os
from celery import Celery, platforms
from datetime import timedelta
from . import settings
BROKER_URL = 'redis://localhost:6379/0'
BACKEND_URL = 'redis://localhost:6379/1'
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'bookmarks.settings')
app = Celery('bookmarks', backend=BACKEND_URL, broker=BROKER_URL)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)
platforms.C_FORCE_ROOT = True

app.conf.update(
    CELERY_ACKS_LATE = True,
    CELERY_ACCEPT_CONTENT = ['json'],
    CELERY_FORCE_EXECV = True,
    CELERY_CONCURRENCY = 4,
    CELERYD_MAX_TASKS_PER_CHILD = 10,
    CELERYBEAT_SCHEDULE = {
        'get_stock_': {
            'task': 'celery_deploy.tasks._stocks',
            'schedule': timedelta(seconds=21),
        },
    },
)

@app.task(bind = True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

我在Django应用程序celery_deploy中的tasks.py:

代码语言:javascript
复制
from __future__ import absolute_import
from celery import shared_task
from django.core.cache import cache
from urllib.request import urlopen
from bookmarks.celery import app
import json
import redis

apibase = "https://financialmodelingprep.com/api/v3/"
apikey = "[KEY]"
exchange_open_day = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri']
json_parsed = {}
CACHE_TIMEOUT = 60*60

@app.task
def _stocks():
    url = apibase + "quotes/nyse" + "?" + apikey
    response = urlopen(url)
    stocks_parsed = json.loads(response.read().decode("utf-8"))
    for _object in stocks_parsed:
        cache.set(_object['symbol'], _object['price'], CACHE_TIMEOUT)

我读了很多和我类似的问题,但还是想不通。

我该怎么做才能修复它呢?谢谢。

EN

回答 1

Stack Overflow用户

发布于 2020-11-11 02:13:11

这个错误很可能与您的Instantiaion有关。假设您正在尝试App-Wide usage,则由于需要设置task_cls属性,因此以下命令将不起作用。

这里

代码语言:javascript
复制
app = Celery('bookmarks', backend=BACKEND_URL, broker=BROKER_URL)

bookmarks引用了您的目录,但在运行时无法导入所需的方法。

你能把这一行改成包括你的文件名吗?

代码语言:javascript
复制
app = Celery('bookmarks.tasks', backend=BACKEND_URL, broker=BROKER_URL)

这将有望解决这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64772262

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档