Python Queue interface for AMQP

By: on June 11, 2009
Here at LShift we’re often discussing RabbitMQ. We’re keen about complicated deployment scenarios, redundancy of the broker and other complex use cases. While these problems are extremely interesting, some believe they are irrelevant for a great majority of RabbittMQ users.
People keep asking how to get started with Rabbit. There are some very good sources however, understanding the AMQP abstractions requires some time.
Having that in mind I was astonished when I’ve seen this code, where Brian wraps AMQP code with a very simple Queue-like interface. This reminded me that messaging can be trivial and intuitive. In some environments, a queue is exactly what you need from messaging.

(It’s worth noting that Brian is not the only person who tries to simplify some abstractions out.)
Let the Python snippet describe his interface:

>>> import amqp_wrapper
>>> q = amqp_wrapper.QueueProducer("test_queue")
Connected to test_queue (0 msgs, 0 consumers)
>>> q.write("hello world!")
>>> q = amqp_wrapper.QueueConsumer("test_queue")
>>> q.get()
('hello world!', 1)
>>> q.ack(1)
>>> q.get() # never blocks
(None, None)

This code never blocks: if the queue is empty we receive a (None, None) tuple. This means that when the queue is empty we have to do polling, which never is a good choice.

I was inspired to modify his wrapper to mimic the generic Python Queue, with blocking get(). I also tried hard to support non-blocking get(), like the Python interface does, but I eventually failed. Here it is, a simplified Python Queue that uses AMQP under the hood:

>>> import amqpqueue
>>> qp = amqpqueue.Producer('testqueue')
>>> qp.put('test')
>>> qc = amqpqueue.Consumer('testqueue')
>>> qc.get()
>>> qc.task_done()
>>> qc.get() # this blocks



  1. Very nice!

    I’ll probably use this sometime in the future. Thanks.

  2. tonyg says:

    What was the difficulty in supporting non-blocking get?

  3. marek says:

    tonyg: What was the difficulty in supporting non-blocking get?

    To make a long story short: as far as I know py-amqplib interface supports only a magical, blocking method: channel.wait(). All the magic happens there. See this example.

    This method doesn’t support non-blocking sockets, nor timeouts. I tried several attempts to work aroud it’s being blocking, but without success.

  4. Stefan Plantikow says:

    I disagree that polling is always evil: Outside AMQP it removes the need for storing state at the producer that describes which items yet need to be delivered to clients and thus increases loose coupling. This is IMHO one of the main reasons for the success of syndication formats like RSS.

  5. brian says:

    While it may have astonished you, that wrapper code works exactly the way we wanted it to– replacing the (slow, often buggy) SQS that we were doing some of our messaging on. Polling is definitely not an issue on the code we’re running, and it simplifies parallelizing the workers…..

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>