Skip to Content
Vasu D.

8 mins read


Expert's guide to Celery Tasks in Python

In this Python & Celery tutorial we will deep dive into Celery tasks. We'll learn how to create tasks and complex workflows using Celery.


What is Celery ?

Celery is an open-source and distributed task queue written in Python. It can be used to perform operations that take huge time to process in background without blocking your main application flow. In this article, we'll be covering Celery tasks in detai, if you were looking for a beginner friendly introduction, head over to our Celery Quickstart guide tutorial.

What are Celery Tasks ?

Celery tasks is the smallest unit of work that can be executed by Celery. It's a Python function that is known to celery and can be executed on external triggers. A celery task is simply a Python function decorated with the @app.task decorator.

Here's an example of a simple Celery task that will sum two numbers and return the result :

from celery import Celery
 
app = Celery('my_app', broker='amqp://guest@localhost//')
 
@app.task
def sum(x, y):
    return x + y

Copy the code above and save it as tasks.py.

How to run Celery tasks ?

Now, let's run this task using Celery.

To run a Celery task, first we need to start a Celery worker. A Celery worker primarily does two things :

  • Knows all the tasks defined under current Celery app
  • Executes tasks when triggered

To start a Celery worker, run the following command in your terminal :

celery -A tasks worker --loglevel=info

Here, tasks is the name of your celery app.

Now that we have our worker up and running, we need to trigger this task. Triggering a task means, sending a trigger to our worker to execute the task with specified arguments.

To trigger a task, we need to import the task and call it. There are multiple methods available to call a function.

  1. Using delay() method : This method is used to trigger a task asynchronously. It returns a AsyncResult object that can be used to get the result of the task.
  2. Using apply_async() method : This method is also used to trigger a task asynchronously. It returns a AsyncResult object similar to delay method that can be used to get the result of the task.
  3. Using apply() method : This method is used to trigger a task synchronously. It blocks the current thread until the task is executed and returns the result of the task. Using this method is not recommended as it blocks your application flow.

To summarize, you can call your tasks in two ways :

  1. Synchronously : Using apply() method or by using .get() method on AsyncResult objects.
  2. Asynchronously : Using delay() or apply_async() methods.

Here's an example on how to call a task using delay() method :

>>> from tasks import sum
>>> result = sum.delay(2, 3)

Features of Celery tasks :

Celery tasks have built-in support for various useful features like :

  • Retries
  • Timeouts
  • Chaining
  • Groups
  • Chords
  • Scheduling
  • Task Priorities
  • Task States
  • Task Events
  • Task Results
  • Task Exceptions
  • Task Signatures

We'll be covering some of these features in this article. You can read more about these features here.

Celery task Retries

Celery comes with built-in task retries on failure. You can specify the number of retries and the delay between retries. To configure a celery task to retry in case of failure, you can use the retry decorator on the task function.

Here's an example of a task with retries - this example will retry the task 3 times with a delay of 5 seconds between each retry :

from celery.decorators import task, retry
 
@task(bind=True)
@retry(max_retries=3, countdown=5)
def my_task(self, arg1, arg2):
    try:
        # Do something here
    except Exception as exc:
        # raise self.retry() will retry the task
        raise self.retry(exc=exc)

You can also specify different delays for each retry attempt by passing a list of delays as the countdown argument. For example:

@retry(max_retries=3, countdown=[5, 10, 15])
def my_task(self, arg1, arg2):
    # ...

This will retry the task with a delay of 5 seconds on the first retry, 10 seconds on the second retry, and 15 seconds on the third retry.

Other than this, you can also use Celery handlers to run custom logic on retry. Here's an example on how you can use on_retry handler to log the retry attempt :

import logging
 
def log_retry(request, **kwargs):
    logger = logging.getLogger(__name__)
    exc = kwargs['exc']
    logger.warning('Retrying task with args: %s (exc: %r)', request.args, exc)
 
@task(bind=True)
@retry(max_retries=3, countdown=5, on_retry=log_retry)
def my_task(self, arg1, arg2):
    try:
        # Do something here
    except Exception as exc:
        raise self.retry(exc=exc)

Frequently Asked questions

What happens when a celery task fails ?

When a celery task fails, the task ends with a FAILURE state and the result is set to the exception traceback raised by the task. You can change this behavior by either setting retries on task exceptions or by using the on_failure handler.

Are Celery tasks Asynchronous ?

Celery tasks are asynchronous by default. This means that when you trigger a task, the task is executed in the background and the current thread is not blocked. You can also trigger a task synchronously by using the apply() method or by calling get() method on AsyncResult object.

Where are celery tasks stored ?

Celery tasks are stored on local disk and are loaded onto memory when a Worker starts. You can also use a database backend to store tasks. You can read more about this here.

How do I get data from celery task ?

You can get data from celery task from Celery's result backend. Or you can also get data from a celery task by using the AsyncResult object. The AsyncResult object is returned when you trigger a task. You can use the AsyncResult object to get the result of the task. You can also use the AsyncResult object to check the status of the task.

Conclusion

And that's all. Now you know how to use Celery tasks and their different features.

In this tutorial, we've tried to summarize different features available with Celery Tasks. Celery Tasks has lot more features than what we've covered in this article. We recommend going through the official documentation to learn more about Celery Tasks. You can read the official documentation here.

We'll soon be posting in-depth articles on various advanced celery topics. I hope you enjoyed this tutorial. If you have any questions or suggestions, feel free to leave a comment below. Thanks for reading!


Looking for a Python Expert? Let's Collaborate!
Get in Touch