Making Celery play nice with RabbitMQ (and Bigwig)

By: on April 30, 2015

Celery is a Python library that implements a task queue with scheduling backed by various “brokers”, including RabbitMQ which is used by default. Celery is supposed to be a simple solution to abstract away the details of the brokers backing the task queue, and for many small applications, this works fine. When your application grows larger, this might become a big problem. We have seen many instances when using the default configuration have caused issues in BigWig, particularly on the free tier when your resources are shared with others. You might be the noisy neighbour, or be affected by a noisy neighbour. This post is an attempt to highlight some of the problems associated with using Celery with RabbitMQ..

RabbitMQ Broker Settings

In general, you don’t want your tasks (sent as messages) to RabbitMQ to be lost in transit. RabbitMQ has a couple of features built into it that can allow your application to be sure when a message has been sent: publisher confirms and heartbeats. Publisher confirms allows your application to asynchronously guarantee that a message has been sent to RabbitMQ and your performance will likely only take a negligible hit. Heartbeats allows your application to check that its connection to the RabbitMQ server is still alive, outside of the usual TCP timeouts. By default, Celery does not make use of these features.

To enable publisher confirm, you should add the following line to your Celery configuration:

BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}

To enable heartbeats, use the BROKER_HEARTBEAT option.

Using RabbitMQ as a results backend


In general, it is not a good idea to use RabbitMQ as a backend to store results from your tasks. There are several large caveats in using RabbitMQ.

The RabbitMQ result backend (amqp) is special as it does not actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once; If you have two processes waiting for the same result, one of the processes will never receive the result!

The nature of RabbitMQ being a messaging tool means that the messages are supposed to be transient in time. It is not a good idea to use RabbitMQ to store the state or result of anything.

Every new task creates a new queue on the server, with thousands of tasks the broker may be overloaded with queues and this will affect performance in negative ways. If you’re using RabbitMQ then each queue will be a separate Erlang process, so if you’re planning to keep many results simultaneously you may have to increase the Erlang process limit, and the maximum number of file descriptors your OS allows.

Old results will be cleaned automatically, based on the CELERY_TASK_RESULT_EXPIRES setting. By default this is set to expire after 1 day: if you have a very busy cluster you should lower this value.

We recommend that you use other result backends to store and retrieve results from tasks. If you do not care about the results at all, set CELERY_IGNORE_RESULT to ‘true’. You should also set CELERY_STORE_ERRORS_EVEN_IF_IGNORED to ‘false’ so that failed tasks do not create queues in RabbitMQ.

If you absolutely must use RabbitMQ to retrieve results, you should try the new RPC Result backend which creates a queue for each client (task producer) rather than a queue for each task. You should also ensure that your task producers consume and acknowledge the results as soon as they can. We also recommend that you change the ‘CELERY_TASK_RESULT_EXPIRES‘ setting to something lower than a day, especially if you have a large number of tasks. You can also ignore results on individual tasks.

Worker Prefetch

Prefetching is where a worker process “reserves” messages from RabbitMQ queues and remove the messages from the queue. This means that other workers will no longer be able to get those messages. This is controlled in Celery by the CELERYD_PREFETCH_MULTIPLIER setting. You might want to look into this setting to configure your Celery application based on how long your tasks usually take to run.

If the worker dies before any of the reserved messages are processed (and therefore acknowledged), RabbitMQ will notice this and then put the messages back in the queue for other workers.

Worker acknowledgement and retrying

When a Celery worker starts to work on a task, it will have to acknowledge to RabbitMQ that it has consumed the message that kickstarted the task execution. Acknowledgement removes the message from the queue. This can happen before the task is executed, or after the task has executed. Globally, this can be controlled by the CELERY_ACKS_LATE setting, or on a per-task basis by Task.acks_late.

By default, Celery acknowledges messages before the tasks are executed, removing the messages from queue. If the worker dies, or something happens that halts the execution of the task, RabbitMQ will not know about this and does nothing. In this case, you can ask Celery to retry the task based on some timeout, but this might require you to use a result backend. Celery will send the retry as a new message with the same task identifier. If you enable late acknowledgement, then Celery will not acknowledge the message until after execution is complete. If the worker dies or disconnects from RabbitMQ, RabbitMQ will automatically redeliver the message to another worker, for example.

A requirement for restarting tasks is that the tasks must be idempotent — that is running a task more than one time will not cause the effects to be repeated. If tasks are not idempotent, restarting or retrying tasks can have dire consequences.

You should always try to make your tasks idempotent, and have RabbitMQ handle the retrying for you by turning on late acknowledgement. If you need more control over how retries are handled, you will have to use a result backend and then detect when tasks have failed. In addition, you can have your task detect when things go wrong and have them retry the task. Combined with late acknowledgement, you can guard against unexpected errors such as your worker’s machine dying.

The caveat in this is that if your workers frequently die or never complete tasks, your message queue will build up incessantly.

It’s nice to be shielded from the details

While it is nice to have a black-box to work with while being shielded from all the details underneath, using things in ways that they are not designed for can have painful consequences. Celery uses RabbitMQ to send tasks to multiple workers, and also use RabbitMQ as a store for task results. The former is what RabbitMQ is designed for, but the latter is far from what RabbitMQ is designed for. It is important to understand what runs the systems underneath so that you can think about how you design your applications in the right frame of mind.

Share

4 Comments

  1. Niklas B says:

    I really appreciated this article. Been using celery in production for about 1-2years now and has started to feel the pain of my ignorance 🙂 (I.e this article helped me tweak another few parameters)

  2. Ask Solem says:

    Nice article!

    Celery uses RabbitMQ to send tasks to multiple workers, and also use RabbitMQ as a store for task results. The former is what RabbitMQ is designed for, but the latter is far from what RabbitMQ is designed for.

    There are two AMQP based result backends in celery, the oldest one called `amqp` uses RabbitMQ as a datastore but it’s use is now discouraged. The new result backend, called `rpc`, uses non-persistent messages and only the caller of the task can retrieve the result.

    If you need persistent results, or results retrieved by multiple processes, you should use a result backend
    backed by a database like redis or sqlalchemy.

  3. Ben says:

    Hey,

    Where are some docs on this option?

    BROKER_TRANSPORT_OPTIONS = {‘confirm_publish’: True}

    I’m having trouble finding any references to it.

Leave a Reply

Your e-mail address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*