Expert's guide to Celery Tasks in Python
In this Python & Celery tutorial we will deep dive into Celery tasks. We'll learn how to create tasks and complex workflows using Celery.
Celery is an open-source and distributed task queue written in Python. It can be used to perform operations that take huge time to process in background without blocking your main application flow. In this article, we'll be covering Celery tasks in detai, if you were looking for a beginner friendly introduction, head over to our Celery Quickstart guide tutorial.
Celery tasks is the smallest unit of work that can be executed by Celery. It's a Python function that is known to celery and can be executed on external triggers. A celery task is simply a Python function decorated with the
Here's an example of a simple Celery task that will sum two numbers and return the result :
from celery import Celery app = Celery('my_app', broker='amqp://guest@localhost//') @app.task def sum(x, y): return x + y
Copy the code above and save it as
Now, let's run this task using Celery.
To run a Celery task, first we need to start a Celery worker. A Celery worker primarily does two things :
- Knows all the tasks defined under current Celery app
- Executes tasks when triggered
To start a Celery worker, run the following command in your terminal :
celery -A tasks worker --loglevel=info
tasks is the name of your celery app.
Now that we have our worker up and running, we need to trigger this task. Triggering a task means, sending a trigger to our worker to execute the task with specified arguments.
To trigger a task, we need to import the task and call it. There are multiple methods available to call a function.
delay()method : This method is used to trigger a task asynchronously. It returns a
AsyncResultobject that can be used to get the result of the task.
apply_async()method : This method is also used to trigger a task asynchronously. It returns a
AsyncResultobject similar to delay method that can be used to get the result of the task.
apply()method : This method is used to trigger a task synchronously. It blocks the current thread until the task is executed and returns the result of the task. Using this method is not recommended as it blocks your application flow.
To summarize, you can call your tasks in two ways :
- Synchronously : Using
apply()method or by using
- Asynchronously : Using
Here's an example on how to call a task using
delay() method :
>>> from tasks import sum >>> result = sum.delay(2, 3)
Celery tasks have built-in support for various useful features like :
- Task Priorities
- Task States
- Task Events
- Task Results
- Task Exceptions
- Task Signatures
We'll be covering some of these features in this article. You can read more about these features here.
Celery comes with built-in task retries on failure. You can specify the number of retries and the delay between retries. To configure a celery task to retry in case of failure, you can use the retry decorator on the task function.
Here's an example of a task with retries - this example will retry the task 3 times with a delay of 5 seconds between each retry :
from celery.decorators import task, retry @task(bind=True) @retry(max_retries=3, countdown=5) def my_task(self, arg1, arg2): try: # Do something here except Exception as exc: # raise self.retry() will retry the task raise self.retry(exc=exc)
You can also specify different delays for each retry attempt by passing a list of delays as the countdown argument. For example:
@retry(max_retries=3, countdown=[5, 10, 15]) def my_task(self, arg1, arg2): # ...
This will retry the task with a delay of 5 seconds on the first retry, 10 seconds on the second retry, and 15 seconds on the third retry.
Other than this, you can also use Celery handlers to run custom logic on retry. Here's an example on how you can use on_retry handler to log the retry attempt :
import logging def log_retry(request, **kwargs): logger = logging.getLogger(__name__) exc = kwargs['exc'] logger.warning('Retrying task with args: %s (exc: %r)', request.args, exc) @task(bind=True) @retry(max_retries=3, countdown=5, on_retry=log_retry) def my_task(self, arg1, arg2): try: # Do something here except Exception as exc: raise self.retry(exc=exc)
When a celery task fails, the task ends with a FAILURE state and the result is set to the exception traceback raised by the task. You can change this behavior by either setting retries on task exceptions or by using the
Celery tasks are asynchronous by default. This means that when you trigger a task, the task is executed in the background and the current thread is not blocked. You can also trigger a task synchronously by using the
apply() method or by calling
get() method on
Celery tasks are stored on local disk and are loaded onto memory when a Worker starts. You can also use a database backend to store tasks. You can read more about this here.
You can get data from celery task from Celery's result backend. Or you can also get data from a celery task by using the
AsyncResult object. The
AsyncResult object is returned when you trigger a task. You can use the
AsyncResult object to get the result of the task. You can also use the
AsyncResult object to check the status of the task.
And that's all. Now you know how to use Celery tasks and their different features.
In this tutorial, we've tried to summarize different features available with Celery Tasks. Celery Tasks has lot more features than what we've covered in this article. We recommend going through the official documentation to learn more about Celery Tasks. You can read the official documentation here.
We'll soon be posting in-depth articles on various advanced celery topics. I hope you enjoyed this tutorial. If you have any questions or suggestions, feel free to leave a comment below. Thanks for reading!