Tutorial

How To Use Celery with RabbitMQ to Queue Tasks on an Ubuntu VPS

Published on December 20, 2013
How To Use Celery with RabbitMQ to Queue Tasks on an Ubuntu VPS

Introduction


Asynchronous, or non-blocking, processing is a method of separating the execution of certain tasks from the main flow of a program. This provides you with several advantages, including allowing your user-facing code to run without interruption.

Message passing is a method which program components can use to communicate and exchange information. It can be implemented synchronously or asynchronously and can allow discrete processes to communicate without problems. Message passing is often implemented as an alternative to traditional databases for this type of usage because message queues often implement additional features, provide increased performance, and can reside completely in-memory.

Celery is a task queue that is built on an asynchronous message passing system. It can be used as a bucket where programming tasks can be dumped. The program that passed the task can continue to execute and function responsively, and then later on, it can poll celery to see if the computation is complete and retrieve the data.

While celery is written in Python, its protocol can be implemented in any language. It can even function with other languages through webhooks.

By implementing a job queue into your program’s environment, you can easily offload tasks and continue to handle interactions from your users. This is a simple way to increase the responsiveness of your applications and not get locked up while performing long-running computations.

In this guide, we will install and implement a celery job queue using RabbitMQ as the messaging system on an Ubuntu 12.04 VPS.

Install the Components


Install Celery


Celery is written in Python, and as such, it is easy to install in the same way that we handle regular Python packages.

We will follow the recommended procedures for handling Python packages by creating a virtual environment to install our messaging system. This helps us keep our environment stable and not effect the larger system.

Install the Python virtual environment package from Ubuntu’s default repositories:

sudo apt-get update
sudo apt-get install python-virtualenv

We will create a messaging directory where we will implement our system:

mkdir ~/messaging
cd ~/messaging

We can now create a virtual environment where we can install celery by using the following command:

virtualenv --no-site-packages venv

With the virtual environment configured, we can activate it by typing:

source venv/bin/activate

Your prompt will change to reflect that you are now operating in the virtual environment we made above. This will ensure that our Python packages are installed locally instead of globally.

If at any time we need to deactivate the environment (not now), you can type:

deactivate

Now that we have activated the environment, we can install celery with pip:

pip install celery

Install RabbitMQ


Celery requires a messaging agent in order to handle requests from an external source. This agent is referred to as a “broker”.

There are quite a few options for brokers available to choose from, including relational databases, NoSQL databases, key-value stores, and actual messaging systems.

We will be configuring celery to use the RabbitMQ messaging system, as it provides robust, stable performance and interacts well with celery. It is a great solution because it includes features that mesh well with our intended use.

We can install RabbitMQ through Ubuntu’s repositories:

sudo apt-get install rabbitmq-server

The RabbitMQ service is started automatically on our server upon installation.

Create a Celery Instance


In order to use celery’s task queuing capabilities, our first step after installation must be to create a celery instance. This is a simple process of importing the package, creating an “app”, and then setting up the tasks that celery will be able to execute in the background.

Let’s create a Python script inside our messaging directory called tasks.py where we can define tasks that our workers can perform.

sudo nano ~/messaging/tasks.py

The first thing we should do is import the Celery function from the celery package:

from celery import Celery

After that, we can create a celery application instance that connects to the default RabbitMQ service:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

The first argument to the Celery function is the name that will be prepended to tasks to identify them.

The backend parameter is an optional parameter that is necessary if you wish to query the status of a background task, or retrieve its results.

If your tasks are simply functions that do some work and then quit, without returning a useful value to use in your program, you can leave this parameter out. If only some of your tasks require this functionality, enable it here and we can disable it on a case-by-case basis further on.

The broker parameter specifies the URL needed to connect to our broker. In our case, this is the RabbitMQ service that is running on our server. RabbitMQ operates using a protocol called “amqp”. If RabbitMQ is operating under its default configuration, celery can connect with no other information other than the amqp:// scheme.

Build Celery Tasks


Still in this file, we now need to add our tasks.

Each celery task must be introduced with the decorator @app.task. This allows celery to identify functions that it can add its queuing functions to. After each decorator, we simply create a function that our workers can run.

Our first task will be a simple function that prints out a string to console.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

Because this function does not return any useful information (it instead prints it to the console), we can tell celery to not use the backend to store state information about this task. This is less complicated under the hood and requires fewer resources.

<pre> from celery import Celery

app = Celery(‘tasks’, backend=‘amqp’, broker=‘amqp://’)

@app.task<span class=“highlight”>(ignore_result=True)</span> def print_hello(): print ‘hello there’ </pre>

Next, we will add another function that will generate prime numbers (taken from RosettaCode). This can be a long-running process, so it is a good example for how we can deal with asynchronous worker processes when we are waiting for a result.

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

Because we care about what the return value of this function is, and because we want to know when it has completed (so that we may use the results, etc), we do not add the ignore_result parameter to this second task.

Save and close the file.

Start Celery Worker Processes


We can now start a worker processes that will be able to accept connections from applications. It will use the file we just created to learn about the tasks it can perform.

Starting a worker instance is as easy as calling out the application name with the celery command. We will include a “&” character at the end of our string to put our worker process in the background:

celery worker -A tasks &

This will start up an application, and then detach it from the terminal, allowing you to continue to use it for other tasks.

If you want to start multiple workers, you can do so by naming each one with the -n argument:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

The %h will be replaced by the hostname when the worker is named.

To stop workers, you can use the kill command. We can query for the process id and then eliminate the workers based on this information.

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

This will allow the worker to complete its current task before exiting.

If you wish to shut down all workers without waiting for them to complete their tasks, you can execute:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

Use the Queue to Handle Work


We can use the worker process(es) we spawned to complete work in the background for our programs.

Instead of creating an entire program to demonstrate how this works, we will explore the different options in a Python interpreter:

python

At the prompt, we can import our functions into the environment:

from tasks import print_hello
from tasks import gen_prime

If you test these functions, they appear to not have any special functionality. The first function prints a line as expected:

print_hello()

hello there

The second function returns a list of prime numbers:

primes = gen_prime(1000)
print primes

If we give the second function a larger range of numbers to check, the execution hangs while it calculates:

primes = gen_prime(50000)

Stop the execution by typing “CTRL-C”. This process is clearly not computing in the background.

To access the background worker, we need to use the .delay method. Celery wraps our functions with additional capabilities. This method is used to pass the function to a worker to execute. It should return immediately:

primes = gen_prime.delay(50000)

This task is now being executed by the workers we started earlier. Because we configured a backend parameter for our application, we can check the status of the computation and get access to the result.

To check whether the task is complete, we can use the .ready method:

primes.ready()

False

A value of “False” means that the task is still running and a result is not available yet. When we get a value of “True”, we can do something with the answer.

primes.ready()

True

We can get the value by using the .get method.

If we have already verified that the value is computed with the .ready method, then we can use that method like this:

print primes.get()

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

If, however, you have not used the .ready method prior to calling .get, you most likely want to add a “timeout” option so that your program isn’t forced to wait for the result, which would defeat the purpose of our implementation:

print primes.get(timeout=2)

This will raise an exception if it times out, which you can handle in your program.

Conclusion


Although this is enough information to get you started on using celery within your programs, it is only scratching the surface on the full functionality of this library. Celery allows you to string background tasks together, group tasks, and combine functions in interesting ways.

Although celery is written in Python, it can be used with other languages through webhooks. This makes it incredibly flexible for moving tasks into the background, regardless of your chosen language.

<div class=“author”>By Justin Ellingwood</div>

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the authors

Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
9 Comments


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

hi first of all thanks really great great doc but i want to setup this on window do you have doc for that?

Much appreciated tutorial. Picks up where the getting started guide leaves off. I would love to see a similar tutorial on how to integrate with Celery from other applications like postgresql to run reporting, etc on a schedule.

Andrew SB
DigitalOcean Employee
DigitalOcean Employee badge
June 11, 2014

@mitesh.development: Sorry, we’re a proud Linux shop!

Really Very Nice For Beginners…Thanks a lot…

What is the rabbitMQ used for in here ?

hi , i use rebbimq in my local pc but ahen i give following command like “celery worker -A tasks &” so it return error like ImportError: No module named tasks error traceback is

File “/home/vrs/bizbii_env/bin/celery”, line 11, in <module> sys.exit(main()) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/main.py”, line 30, in main main() File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/bin/celery.py”, line 81, in main cmd.execute_from_commandline(argv) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/bin/celery.py”, line 793, in execute_from_commandline super(CeleryCommand, self).execute_from_commandline(argv))) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/bin/base.py”, line 309, in execute_from_commandline argv = self.setup_app_from_commandline(argv) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/bin/base.py”, line 469, in setup_app_from_commandline self.app = self.find_app(app) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/bin/base.py”, line 489, in find_app return find_app(app, symbol_by_name=self.symbol_by_name) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/app/utils.py”, line 235, in find_app sym = symbol_by_name(app, imp=imp) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/bin/base.py”, line 492, in symbol_by_name return symbol_by_name(name, imp=imp) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/kombu/utils/init.py”, line 96, in symbol_by_name module = imp(module_name, package=package, **kwargs) File “/home/vrs/bizbii_env/local/lib/python2.7/site-packages/celery/utils/imports.py”, line 101, in import_from_cwd return imp(module, package=package) File “/usr/lib/python2.7/importlib/init.py”, line 37, in import_module import(name) ImportError: No module named tasks

saviour !!! :)

and any notes on celery beat

Is there an instruction on how to install celery with django and rabbitmq on a digitalocean droplet or server?

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Become a contributor for community

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

DigitalOcean Documentation

Full documentation for every DigitalOcean product.

Resources for startups and SMBs

The Wave has everything you need to know about building a business, from raising funding to marketing your product.

Get our newsletter

Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.

New accounts only. By submitting your email you agree to our Privacy Policy

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.