TLDR

Starting a Celery worker with celery -A jobs worker -l INFO will look for

  • An app or celery attribute in jobs: jobs:app or jobs:celery
  • A celery submodule with an app or celery attribute: jobs.celery:app

If you aren’t following these patterns you need to specify where the Celery app is. For a jobs package with a tasks submodule and a Celery app declared as celery_app the command should be:

celery --app jobs.tasks:celery_app worker -l INFO

Not using a piece of tech for a while can present relearning headaches that make you ask youself “why didn’t I just write this down somewhere the first time?”. That’s what this TIL is.

Before we start though, Celery is a distributed task queue system written in Python. It can be compared to other queue systems like

Some background and the issue

Throughout the documentation you’ll find that starting a Celery working is a simple as executing

celery -A tasks worker --loglevel INFO

When strictly following the getting started you should have no problems and thing will work as expected. Beyond this simple use though you will want to understand hwo the -A or --app works. Especially when adding a Celery application to a new package in an existing project.

Since I’m refreshing my Celery knowledge after years of not using it, I decided to keep it simple and as close to what is found in the Celery documentation. The only difference was that I needed to call the Celery task from a FastAPI endpoint.

I’m starting off with this project layout (very similar to what you see in the docs)

❯ tree src/jobs
src/jobs
├── __init__.py
├── steps.py
└── tasks.py

1 directory, 3 files

The tasks.py file is where I’m configuring my Clery app and where I’m also defining my initial task send_data. steps.py is currently empty.

from celery import Celery
from app.config import config

# from .steps import get_usage_spans

celery_app = Celery("jobs", broker=config.REDIS_URI)


@celery_app.task
def send_data(data: dict[str, Any]):
    ...   2   ```sh

    return "Send data request received: "

Starting my worker presented no issues, but the tasks was called during a request sent to the API I got this error message:

[2024-09-22 18:14:45,756: ERROR/MainProcess] Received unregistered task of type 'jobs.tasks.send_data'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[["field1", "field2", "field3", "field4"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (123b)

The full contents of the message headers:
{'lang': 'py', 'task': 'jobs.tasks.send_data', 'id': 'b301e3bc-9ab9-48f6-b4c9-3158e1437a85', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'b301e3bc-9ab9-48f6-b4c9-3158e1437a85', 'parent_id': None, 'argsrepr': "('tenant', 'account', 'service_point', 'device')", 'kwargsrepr': '{}', 'origin': 'gen95044@froi.local', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}

The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "/Users/froi/Library/Caches/pypoetry/virtualenvs/data-integrations-iehcu-zB-py3.11/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received
    strategy = strategies[type_]
               ~~~~~~~~~~^^^^^^^
KeyError: 'jobs.tasks.send_data'

Thus the moral of this story.

The –app argument

Starting a worker is simple

celery -A workers worker -l INFO

The -A or –app is used to specify the Celery application instance to use. What I didn’t remember was that this follows a module.path:attribute pattern. I also didn’t remember the defaults it follows.

I was starting my workers with

celery -A jobs worker -l INFO

This meant that Celery will look for the configured application by

  1. Look for an app or celery attribute in a jobs module, the equivalent of writting it as celery -A jobs:app worker -l INFO
  2. Or by looking for and app or celery attribute in a celery submodule in the jobs package, the equivalent of writting it as celery -A jobs.celery:app worker -l INFO

Since didn’t declare any of my resources in this way I needed to start my worker with

celery -A jobs.tasks:celery_app worker -l INFO

Once I finally understood this again, like magic, my code worked. Hope this helps somebody not have to scream at their monitor.