Sophie

Sophie

distrib > Fedora > 14 > x86_64 > media > updates > by-pkgid > 8d3474d1daee4197d7f9f4eb0188cd4c > files > 42

python-amqplib-0.6.1-2.fc14.noarch.rpm

Notes on using Python amqplib

For more detailed descriptions of AMQP concepts, visit

    https://jira.amqp.org/confluence/display/AMQP/Download

This doc just covers some example usages of the Python library.


Importing the client
--------------------
This library currently supports the AMQP 0-8 spec, importing a module
geared towards that spec can be done with something like:

    import amqplib.client_0_8 as amqp


Connections
----------
A connection object represents a network connection to the server.
Some examples for creating a Connection object:

    conn = amqp.Connection()
    conn = amqp.Connection('1.2.3.4')
    conn = amqp.Connection(host='1.2.3.4:5672', ssl=True)
    conn = amqp.Connection(userid='foo', password='bar')

Connections are closed with the close() method

    conn.close()


Channels
----------
Once a Connection is open, one or more Channel objects may be
opened, acting as virtual connections between peers.  Channels are
where most of the action is.

    ch = conn.channel(1)  # create and open channel with a specific numeric id

    ch2 = conn.channel()   # let amqplib assign a channel number
    print 'my channel is', ch2.channel_id

    ch3 = conn.channel(1) # since channel 1 is already created and open, just
                          # return another reference to that.

Channels have a close() method too, which doesn't affect other channels

    ch2.close()

+----------------------------------------------------------------
|
|  IMPORTANT!! - be sure to close your channels or connection
|                if you've been calling async methods like
|                basic_publish() - to ensure your messages are
|                actually flushed out the TCP socket before the
|                program ends, instead of potentially being lost.
|
+----------------------------------------------------------------

Connections and Channels will close automatically when used as context
managers in 'with' statements (available in Python 2.5 or higher).
For example:

    with Connection('1.2.3.4') as conn:
        with conn.channel() as ch:
            #
            # Do some stuff...
            #
            ch.basic_publish(blah, blah, blah)

        # When this point is reached, the Channel will have been
        # closed for you.

    # When this point is reached, the Connection will have been
    # closed for you.


Access Tickets
--------------
Before a channel can do much else, it needs an access ticket, specifying
a 'realm', and the actions it wants to perform on that realm.

    tkt = ch.access_request('/data', active=True, write=True, read=True)
    ch.exchange_declare('myfan', 'fanout', auto_delete=True, ticket=tkt)

The most recently requested ticket is saved in the Channel object, and
used as a default value for other methods that require tickets.  So it's
not necessary to manually keep and pass it around.  A simpler version of the
above code is:

    ch.access_request('/data', active=True, write=True, read=True)
    ch.exchange_declare('myfan', 'fanout', auto_delete=True)


Messages
---------
This library only supports the Basic Content type, although the class was
named 'Message' based on what's coming in the 0-10 spec.

A Message can be created as simply as

    msg = amqp.Message('hello world')

Unicode bodies are converted to UTF-8 and the 'content_encoding' property
is automatically set to 'UTF-8', so these two Messages are equivalent

    msg1 = amqp.Message(u'Unicode hello')
    msg2 = amqp.Message(u'Unicode hello'.encode('utf-8'), content_encoding='UTF-8')

Messages may have an 'application_headers' dictionary (another 0-10ism, 0-8
just calls the property 'headers'), consisting of string keys and values that
are strings, integers, Decimal (a bit shaky), datetime.datetime, and other
dictionaries containing those same types.

    msg = amqp.Message('fancy', application_headers={'foo': 7, 'bar': 'baz'})


Exchanges & Queues
-------------------
In the AMQP model, publishers send messages to exchanges, which then
distribute them to queues, which deliver them to receivers.

AMQP defines some default exchanges such as, 'amq.direct', 'amq.topic',
'amq.fanout' - which can be used right away.  To send to one of
these default exchanges:

    ch.basic_publish(msg, 'amq.fanout')

To declare your own exchange and send it a message:

    ch.exchange_declare('myfan', type='fanout')
    ch.basic_publish(msg, 'myfan')

To receive messages, a program must declare a queue, and bind it to an
exchange:

    ch.queue_declare('myqueue')
    ch.queue_bind('myqueue', 'amq.fanout')

If the queue name is omitted from the queue_declare() method, a unique
one is generated by the server and returned (along with message_count and
consumer_count values which we don't care about in this case).

    qname, _, _ = ch.queue_declare()
    ch.queue_bind(qname, 'amq.fanout')


Receiving Messages
------------------
Once a queue is bound to an exchange, messages can be received either by
polling or by waiting on the channel.

Polling with basic_get() immediately returns either a Message object, or
None if no messages are available:

    msg = ch.basic_get('myqueue')
    if msg is not None:
        # do something
        ch.basic_ack(msg.delivery_tag)

Received messages are acknowledged with the Channel.basic_ack() method
unless we called basic_get() with a no_ack=True parameter.

To block and wait for messages, use basic_consume() with a callback
function, and then wait() on the channel.  The AMQP server will deliver
messages to the channel as they're placed into the queue.

    def mycallback(msg):
        print 'received', msg.body, 'from channel #', msg.channel.channel_id

    ch.basic_consume('myqueue', callback=mycallback, no_ack=True)
    while True:
        ch.wait()

The above example will loop forever.  To cancel the 'consume', use
basic_cancel() with a consumer_tag.  A consumer_tag is a string that's
either specified when you call basic_consume()

    ch.basic_consume('myqueue', callback=mycallback, consumer_tag='mytag')
    ...
    ch.basic_cancel('mytag')

or uniquely generated by the server.

    tag = ch.basic_consume('myqueue', callback=mycallback)
    ...
    ch.basic_cancel(tag)

The consumer_tag is also included as a property in any Message objects
delivered through a callback.

    msg.channel.basic_cancel(msg.consumer_tag)

A Channel object has a 'callbacks' property that can be checked to see
if there are any outstanding basic_consume()  operations registered.
The demo/demo_send.py script uses this to allow for breaking out of
a waiting loop cleanly.

### EOF ###