4 minutes
TIL: Celery and the –app argument
TLDR
Starting a Celery worker with celery -A jobs worker -l INFO
will look for
- An app or celery attribute in jobs:
jobs:app
orjobs:celery
- A celery submodule with an app or celery attribute:
jobs.celery:app
If you aren’t following these patterns you need to specify where the Celery app is. For a jobs package with a tasks submodule and a Celery app declared as celery_app the command should be:
celery --app jobs.tasks:celery_app worker -l INFO
Not using a piece of tech for a while can present relearning headaches that make you ask youself “why didn’t I just write this down somewhere the first time?”. That’s what this TIL is.
Before we start though, Celery is a distributed task queue system written in Python. It can be compared to other queue systems like
- Sidekiq for Ruby
- Bull for Node
- River Queue for Go
Some background and the issue
Throughout the documentation you’ll find that starting a Celery working is a simple as executing
celery -A tasks worker --loglevel INFO
When strictly following the getting started you should have no problems and thing will work as expected. Beyond this
simple use though you will want to understand hwo the -A
or --app
works. Especially when adding a Celery application
to a new package in an existing project.
Since I’m refreshing my Celery knowledge after years of not using it, I decided to keep it simple and as close to what is found in the Celery documentation. The only difference was that I needed to call the Celery task from a FastAPI endpoint.
I’m starting off with this project layout (very similar to what you see in the docs)
❯ tree src/jobs
src/jobs
├── __init__.py
├── steps.py
└── tasks.py
1 directory, 3 files
The tasks.py file is where I’m configuring my Clery app and where I’m also defining my initial task send_data
.
steps.py
is currently empty.
from celery import Celery
from app.config import config
# from .steps import get_usage_spans
celery_app = Celery("jobs", broker=config.REDIS_URI)
@celery_app.task
def send_data(data: dict[str, Any]):
... 2 ```sh
return "Send data request received: "
Starting my worker presented no issues, but the tasks was called during a request sent to the API I got this error message:
[2024-09-22 18:14:45,756: ERROR/MainProcess] Received unregistered task of type 'jobs.tasks.send_data'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[["field1", "field2", "field3", "field4"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (123b)
The full contents of the message headers:
{'lang': 'py', 'task': 'jobs.tasks.send_data', 'id': 'b301e3bc-9ab9-48f6-b4c9-3158e1437a85', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'b301e3bc-9ab9-48f6-b4c9-3158e1437a85', 'parent_id': None, 'argsrepr': "('tenant', 'account', 'service_point', 'device')", 'kwargsrepr': '{}', 'origin': 'gen95044@froi.local', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}
The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "/Users/froi/Library/Caches/pypoetry/virtualenvs/data-integrations-iehcu-zB-py3.11/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received
strategy = strategies[type_]
~~~~~~~~~~^^^^^^^
KeyError: 'jobs.tasks.send_data'
Thus the moral of this story.
The –app argument
Starting a worker is simple
celery -A workers worker -l INFO
The -A or –app is used to specify the Celery application instance to use. What I didn’t remember was that this follows a module.path:attribute pattern. I also didn’t remember the defaults it follows.
I was starting my workers with
celery -A jobs worker -l INFO
This meant that Celery will look for the configured application by
- Look for an app or celery attribute in a jobs module, the equivalent of writting it as
celery -A jobs:app worker -l INFO
- Or by looking for and app or celery attribute in a celery submodule in the jobs package, the equivalent
of writting it as
celery -A jobs.celery:app worker -l INFO
Since didn’t declare any of my resources in this way I needed to start my worker with
celery -A jobs.tasks:celery_app worker -l INFO
Once I finally understood this again, like magic, my code worked. Hope this helps somebody not have to scream at their monitor.