EvServer, part2: Rabbit and Comet

By: on February 18, 2009

Few days ago I introduced EvServer. In this post I’ll present a simple EvServer example.

EvServer is a normal WSGI server, but with one additional feature. Instead of blocking in your WSGI application you yield a file descriptor to the server. On descriptor activity the server will continue your WSGI app till it yields again.

I’ll show how to wait for AMQP messages inside the WSGI application and how to push them up to the browser. If you can’t wait till the end of the post, please feel free to view the online demo(outdated) of the code described below.


Consuming (from) the Rabbit

To consume AMQP messages we need a python amqp library (version 0.6). Barry Pederson rewrote it lately and I must admit I really like the code now. Great work Barry! Usually, the code that consumes messages (a subscriber) looks like this:

import amqplib.client_0_8 as amqp

def callback(msg):     print msg.body

def main():     conn = amqp.Connection('localhost', userid='guest', password='guest')     ch = conn.channel()     ch.access_request('/data', active=True, read=True)     ch.exchange_declare('myfan', 'fanout', auto_delete=True)     qname, _, _ = ch.queue_declare()     ch.queue_bind(qname, 'myfan')     ch.basic_consume(qname, callback=callback)     while ch.callbacks:         ch.wait() # blocking here!     ch.close()     conn.close()

The problem is that we need to use it in an Asynchronous WSGI application which is non blocking. The conversion is not very difficult. First, we need to identify the socket descriptor on which the wait() blocks, then we need to make it non-blocking. This solution is quite hackish, but it’s all we can do. Maybe this hack should be put in the Hall of Fame of Dirty Hacks…

    sd = conn.transport.sock
    sd.setblocking(False)

At this point, a bad thing happens when ch.wait() tries to block on a non-blocking descriptor. We’d expect the socket to raise socket.error: (11, ‘Resource temporarily unavailable’). Actually, it fails with quite a strange exception, but fortunately it doesn’t break anything inside py-amqplib:

Traceback (most recent call last):
[...]
  File "/home/majek/amqplib-0.6/amqplib/client_0_8/connection.py", line 201, in _wait_method
    self.method_reader.read_method() TypeError: 'NoneType' object is not iterable

We use this exception to identify when the library wants to block. Having this knowledge, the main loop of a non-blocking consumer is now easy to write:

    conn.transport.sock.setblocking(False)
    while True:
        try:
            while True: # until the exception
                ch.wait()
        except TypeError:
            pass

        <block till activity on conn.transport.sock>

At this point modifying the code to became a valid AWSGI application is straightforward. Full code can be found in the EvServer examples directory. Here’s the simplified version:

def wsgi_subscribe(environ, start_response):
    start_response("200 OK", [('Content-type','text/plain')])

    msgs = []
    def callback(msg):
        msgs.append(msg.body)
        msg.channel.basic_ack(msg.delivery_tag)

    <setup connection, channel, queue>
    conn.transport.sock.setblocking(False)

    try:
        while ch.callbacks:
            try:
                while True: # until exception
                    ch.wait()
            except (TypeError,), e:
                pass

            yield 'got messages: %rn' % (msgs,)
            while msgs: msgs.pop() # empty the queue

            # block!
            yield environ['x-wsgiorg.fdevent.readable'](conn.transport.sock)
    except GeneratorExit:
        pass

Introducing the Comet library

Michael Carter has figured out how to make Comet – long lasting HTTP push connection – work on all major browsers. As a part of EvServer I distribute a simple javascript Comet library based on his work. The basic API consists of one method that creates a comet channel: comet_connection(url, callback).

<script src="./static/comet.js" type="text/javascript"></script>

<script>
function user_callback(data){
    alert('got message: ' + data);
}
close_comet_function = comet_connection(url, user_callback);
</script>

There are many different encapsulation types for Comet messages and the format of the messages emitted from the server depends sensitively on the browser type. Fortunately the WSGI application can remain quite simple by using the evserver.transports wrapper:

import evserver.transports as transports

def simplest_comet_application(environ, start_response):     t = transports.get_transport(<comet transport family>)     start_response('200 OK', t.get_headers())     yield t.start()     yield t.write('fist message!')     yield t.write('second message!')

Please, don’t build a chat

I think we’re all sick of yet-another-chat examples. I propose to build something similar, but instead of broadcasting chat messages let’s broadcast user-agent and referrer http fields of people who view the site.

The html site is going to be dead simple.

On the server side we will serve comet.js file and the main HTML file. While serving the HTML, we’ll send an AMQP message. We also need to create a Comet channel URL. Let’s use Django – in total we need three Django views, one of which can be generic.

Deployment
We need: python2.5, rabbitmq, evserver, py-amqplib-0.6, django and my django project code.

sudo apt-get install erlang-nox setuptools subversion
wget http://www.rabbitmq.com/releases/rabbitmq-server/v1.5.1/rabbitmq-server_1.5.1-1_all.deb
sudo dpkg -i rabbitmq-server_1.5.1-1_all.deb

wget http://evserver.googlecode.com/files/<current-evserver-version>
sudo easy_install evserver-*.egg

wget http://barryp.org/static/software/download/py-amqplib/0.6/amqplib-0.6.tgz
tar xvzf amqplib-0.6.tgz
cd amqplib-0.6
sudo python setup.py install
cd ..

wget http://www.djangoproject.com/download/1.0.2/tarball/
tar xzvf Django-1.0.2-final.tar.gz
cd Django-1.0.2-final
sudo python setup.py install

cd ..

svn checkout http://evserver.googlecode.com/svn/trunk/evserver/examples/django_agentpush django_agentpush

After these steps, to run the code just type:

cd django_agentpush
./manage.py runevserver
or
PYTHONPATH=".." DJANGO_SETTINGS_MODULE=django_agentpush.settings evserver --framework=django

Currently you should be able to view online the working code(outdated) (source code is here).

Summary
I’ve shown how to build a comet application that waits for messages from RabbitMQ. Using these tools you should be able to build ambitious real-time collaboration web applications, like EtherPad. I’m starting to believe that the hardest part in real-time web apps is the javascript.

LShift is recruiting! (thanks to all of you that already have contacted us, we’re still reviewing your CVs)

Share

7 Comments

  1. Brian H. says:

    What’s your CPU usage? I ask because unless I’m missing something, this is a tight loop:

    while True: # until exception
    ch.wait()

  2. marek says:

    What’s your CPU usage?

    Almost none. The server is mostly waiting for data on sockets using epoll(). You can track the current numbers using the status interface: http://cometdemo.lshift.net:9999/

    Currently, after few days of working, the server used 90 seconds of cpu, but 60 seconds were in the kernel (system). So the actual application used about 30 seconds of cpu time.

    I ask because unless I’m missing
    something, this is a tight loop:

    ch.wait reads one message from a socket per iteration. So the loop is iterated exactly as many times as many messages are received. When ch.wait doesn’t see any data on a socket, it fails with TypeError exception and the loop is broken.

  3. Denis Laprise says:

    In your example you acknowledge the message in the consumer callback.
    How would you make the acknowledgement from the client? The use case would be:
    the msg is serialized to json and sent to the client (javascript)
    then it gets acknowledged by having the client pinging some wsgi handler.
    the problem that I see is that the delivery_tag is channel-based and the channel is local to the subscriber view
    Any ideas?

  4. sqwishy says:

    How do I get this running with Django on wsgi? It works fine with python manage.py runevserver and I already have wsgi set up to work properly.

  5. sqwishy says:

    Nevermind, I was trying to use apache with evserver but I suppose that’s not possible. There was no multithreading with manage.py.

Leave a Reply

Your e-mail address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*