Import from kombu from tarball

This commit is contained in:
Jérôme Schneider 2014-10-31 17:36:08 +01:00
commit e484028434
267 changed files with 34800 additions and 0 deletions

118
AUTHORS Normal file
View File

@ -0,0 +1,118 @@
=========
AUTHORS
=========
:order: sorted
Adam Gaca <adam.gaca@oke.pl>
Adam Nelson <adam@varud.com>
Adam Wentz
Alex Koshelev <daevaorn@gmail.com>
Alexandre Bourget <alexandre.bourget@savoirfairelinux.com>
Andrew Watts
Andrey Antukh <niwi@niwi.be>
Andrii Kostenko <andrey@kostenko.name>
Andy McCurdy <andy@andymccurdy.com>
Antoine Legrand <antoine.legrand@smartjog.com>
Anton Gyllenberg <anton@iki.fi>
Ask Solem <ask@celeryproject.org>
Basil Mironenko <bmironenko@ddn.com>
Bobby Beever <bobby.beever@yahoo.com>
Brian Bernstein
C Anthony Risinger <anthony+corvisa.com@xtfx.me>
Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com>
Corentin Ardeois <cardeois@iweb.com>
Dan LaMotte <lamotte85@gmail.com>
Dan McGee <dan@archlinux.org>
Dane Guempel <daneguempel@gmail.com>
Davanum Srinivas <dims@linux.vnet.ibm.com>
David Clymer <david@zettazebra.com>
David Gelvin <david.gelvin@gmail.com>
David Strauss <david@davidstrauss.net>
David Ziegler <david.ziegler@gmail.com>
Dhananjay Nene <dhananjay.nene@gmail.com>
Dmitry Malinovsky <dmalinovsky@thumbtack.net>
Dustin J. Mitchell <dustin@mozilla.com>
Ephemera <obliviscence+git@gmail.com>
Eric Reynolds <ereynolds@opendns.com>
Fabrice Rabaute <fabrice@expa.com>
Felix Schwarz <felix.schwarz@oss.schwarz.eu>
Fernando Jorge Mota <f.j.mota13@gmail.com>
Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>
Florian Munz <surf@theflow.de>
Franck Cuny <fcuny@saymedia.com>
Germán M. Bravo <german.mb@gmail.com>
Gregory Haskins <greg@greghaskins.com>
Hong Minhee <minhee@dahlia.kr>
Ian Eure <ian.eure@gmail.com>
Ian Struble <istruble@gmail.com>
Ionel Maries Cristian <contact@ionelmc.ro>
James Saryerwinnie <js@jamesls.com>
James Turk <james.p.turk@gmail.com>
Jason Cater <jason@ncsfulfillment.com>
Jasper Bryant-Greene <jbg@rf.net.nz>
Jeff Balogh <me@jeffbalogh.org>
Jesper Thomschütz <jesper@jespersaur.com>
John Shuping <jshuping@acm.org>
John Spray <jcspray@gmail.com>
John Watson <john@disqus.com>
Jonathan Halcrow <jonathan.halcrow@gmail.com>
Joseph Crosland <jcrosland@flumotion.com>
Keith Fitzgerald <ghostrocket@me.com>
Kevin McCarthy <me@kevinmccarthy.org>
Kevin McDonald <k3vinmcdonald@gmail.com>
Latitia M. Haskins <lhaskins@jetsonsys.com>
Len Buckens <buckens.len@gmail.com>
Mahendra M <Mahendra_M@infosys.com>
Marcin Lulek (ergo) <info@webreactor.eu>
Mark Lavin <mlavin@caktusgroup.com>
Matt Wise <wise@wiredgeek.net>
Maxime Rouyrre <rouyrre+git@gmail.com>
Mher Movsisyan <mher.movsisyan@gmail.com>
Michael Barrett <mb@eventbrite.com>
Michael Nelson <michaeln@telesign.com>
Nitzan Miron <bug.assembla@bugbug.me>
Noah Kantrowitz <noah@coderanger.net>
Ollie Walsh <ollie.walsh@geemail.kom>
Pascal Hartig <phartig@rdrei.net>
Patrick Schneider <patrick.p2k.schneider@gmail.com>
Paul McLanahan <paul@mclanahan.net>
Petar Radosevic <petar@wunki.org>
Peter Hoffmann <tosh54@gmail.com>
Pierre Riteau <priteau@ci.uchicago.edu>
Rafael Duran Castaneda <rafadurancastaneda@gmail.com>
Rafal Malinowski <malinowski@red-sky.pl>
Ralf Nyren <ralf-github@nyren.net>
Randy Barlow <rbarlow@redhat.com>
Rob Ottaway <robottaway@gmail.com>
Roger Hu <rhu@hearsaycorp.com>
Rumyana Neykova <rumi.neykova@gmail.com>
Rune Halvorsen <runeh@opera.com>
Ryan Petrello <lists@ryanpetrello.com>
Sam Stavinoha <smlstvnh@gmail.com>
Sascha Peilicke <saschpe@gmx.de>
Scott Lyons <scottalyons@gmail.com>
Sean Bleier <sebleier@gmail.com>
Sean Creeley <sean.creeley@gmail.com>
Seb Insua <sebastian.insua@saffrondigital.com>
Shane Caraveo <shane@caraveo.com>
Steeve Morin <steeve.morin@gmail.com>
Stefan Eletzhofer <se@nexiles.de>
Stephan Jaekel <steph@rdev.info>
Stephen Day <stevvooe@gmail.com>
Tareque Hossain
Thomas Johansson <prencher@prencher.dk>
Tobias Schottdorf <tobias@goshippo.com>
Tomaž Muraus <kami@k5-storitve.net>
Tommie McAfee <tommie@couchbase.com>
Travis Cline <travis.cline@gmail.com>
Travis Swicegood <development@domain51.com>
Victor Garcia <victor@tuenti.com>
Viet Hung Nguyen <hvnsweeting@gmail.com>
Vince Gonzalez <vince.gonzalez@gmail.com>
Vincent Driessen <vincent@datafox.nl>
Zach Smith <zmsmith27@gmail.com>
Zhao Xiaohong <mrluanma@gmail.com>
haridsv
iSlava <sig.crea@gmail.com>

3012
Changelog Normal file

File diff suppressed because it is too large Load Diff

16
FAQ Normal file
View File

@ -0,0 +1,16 @@
============================
Frequently Asked Questions
============================
Questions
=========
Q: Message.reject doesn't work?
--------------------------------------
**Answer**: Earlier versions of RabbitMQ did not implement ``basic.reject``,
so make sure your version is recent enough to support it.
Q: Message.requeue doesn't work?
--------------------------------------
**Answer**: See _`Message.reject doesn't work?`

21
INSTALL Normal file
View File

@ -0,0 +1,21 @@
Installation
============
You can install ``kombu`` either via the Python Package Index (PyPI)
or from source.
To install using ``pip``,::
$ pip install kombu
To install using ``easy_install``,::
$ easy_install kombu
If you have downloaded a source tarball you can install it
by doing the following,::
$ python setup.py build
# python setup.py install # as root

26
LICENSE Normal file
View File

@ -0,0 +1,26 @@
Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
Copyright (c) 2009-2012, Ask Solem & contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Ask Solem nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS
BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

17
MANIFEST.in Normal file
View File

@ -0,0 +1,17 @@
include AUTHORS
include Changelog
include FAQ
include INSTALL
include LICENSE
include MANIFEST.in
include README.rst
include README
include THANKS
include TODO
include setup.cfg
recursive-include extra *
recursive-include docs *
recursive-include kombu *.py
recursive-include requirements *.txt
recursive-include funtests *.py setup.cfg
recursive-include examples *.py

355
PKG-INFO Normal file
View File

@ -0,0 +1,355 @@
Metadata-Version: 1.1
Name: kombu
Version: 3.0.21
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
Author-email: ask@celeryproject.org
License: UNKNOWN
Description: .. _kombu-index:
========================================
kombu - Messaging library for Python
========================================
:Version: 3.0.21
`Kombu` is a messaging library for Python.
The aim of `Kombu` is to make messaging in Python as easy as possible by
providing an idiomatic high-level interface for the AMQ protocol, and also
provide proven and tested solutions to common messaging problems.
`AMQP`_ is the Advanced Message Queuing Protocol, an open standard protocol
for message orientation, queuing, routing, reliability and security,
for which the `RabbitMQ`_ messaging server is the most popular implementation.
Features
========
* Allows application authors to support several message server
solutions by using pluggable transports.
* AMQP transport using the `py-amqp`_ or `librabbitmq`_ client libraries.
* High performance AMQP transport written in C - when using `librabbitmq`_
This is automatically enabled if librabbitmq is installed::
$ pip install librabbitmq
* Virtual transports makes it really easy to add support for non-AMQP
transports. There is already built-in support for `Redis`_,
`Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_, `ZeroMQ`_,
`ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_.
* You can also use the SQLAlchemy and Django ORM transports to
use a database as the broker.
* In-memory transport for unit testing.
* Supports automatic encoding, serialization and compression of message
payloads.
* Consistent exception handling across transports.
* The ability to ensure that an operation is performed by gracefully
handling connection and channel errors.
* Several annoyances with `amqplib`_ has been fixed, like supporting
timeouts and the ability to wait for events on more than one channel.
* Projects already using `carrot`_ can easily be ported by using
a compatibility layer.
For an introduction to AMQP you should read the article `Rabbits and warrens`_,
and the `Wikipedia article about AMQP`_.
.. _`RabbitMQ`: http://www.rabbitmq.com/
.. _`AMQP`: http://amqp.org
.. _`py-amqp`: http://pypi.python.org/pypi/amqp/
.. _`Redis`: http://code.google.com/p/redis/
.. _`Amazon SQS`: http://aws.amazon.com/sqs/
.. _`MongoDB`: http://www.mongodb.org/
.. _`CouchDB`: http://couchdb.apache.org/
.. _`ZeroMQ`: http://zeromq.org/
.. _`Zookeeper`: https://zookeeper.apache.org/
.. _`Beanstalk`: http://kr.github.com/beanstalkd/
.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
.. _`amqplib`: http://barryp.org/software/py-amqplib/
.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
.. _`carrot`: http://pypi.python.org/pypi/carrot/
.. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq
.. _`Pyro`: http://pythonhosting.org/Pyro
.. _`SoftLayer MQ`: http://www.softlayer.com/services/additional/message-queue
.. _transport-comparison:
Transport Comparison
====================
+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
must be declared by all clients that needs them.
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
Documentation
-------------
Kombu is using Sphinx, and the latest documentation can be found here:
http://kombu.readthedocs.org/
Quick overview
--------------
::
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
Or handle channels manually::
with connection.channel() as channel:
producer = Producer(channel, ...)
consumer = Producer(channel)
All objects can be used outside of with statements too,
just remember to close the objects after use::
from kombu import Connection, Consumer, Producer
connection = Connection()
# ...
connection.release()
consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
# ....
consumer.cancel()
`Exchange` and `Queue` are simply declarations that can be pickled
and used in configuration files etc.
They also support operations, but to do so they need to be bound
to a channel.
Binding exchanges and queues to a connection will make it use
that connections default channel.
::
>>> exchange = Exchange('tasks', 'direct')
>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()
# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
a channel.
Installation
============
You can install `Kombu` either via the Python Package Index (PyPI)
or from source.
To install using `pip`,::
$ pip install kombu
To install using `easy_install`,::
$ easy_install kombu
If you have downloaded a source tarball you can install it
by doing the following,::
$ python setup.py build
# python setup.py install # as root
Terminology
===========
There are some concepts you should be familiar with before starting:
* Producers
Producers sends messages to an exchange.
* Exchanges
Messages are sent to exchanges. Exchanges are named and can be
configured to use one of several routing algorithms. The exchange
routes the messages to consumers by matching the routing key in the
message with the routing key the consumer provides when binding to
the exchange.
* Consumers
Consumers declares a queue, binds it to a exchange and receives
messages from it.
* Queues
Queues receive messages sent to exchanges. The queues are declared
by consumers.
* Routing keys
Every message has a routing key. The interpretation of the routing
key depends on the exchange type. There are four default exchange
types defined by the AMQP standard, and vendors can define custom
types (so see your vendors manual for details).
These are the default exchange types defined by AMQP/0.8:
* Direct exchange
Matches if the routing key property of the message and
the `routing_key` attribute of the consumer are identical.
* Fan-out exchange
Always matches, even if the binding does not have a routing
key.
* Topic exchange
Matches the routing key property of the message by a primitive
pattern matching scheme. The message routing key then consists
of words separated by dots (`"."`, like domain names), and
two special characters are available; star (`"*"`) and hash
(`"#"`). The star matches any word, and the hash matches
zero or more words. For example `"*.stock.#"` matches the
routing keys `"usd.stock"` and `"eur.stock.db"` but not
`"stock.nasdaq"`.
Getting Help
============
Mailing list
------------
Join the `carrot-users`_ mailing list.
.. _`carrot-users`: http://groups.google.com/group/carrot-users/
Bug tracker
===========
If you have any suggestions, bug reports or annoyances please report them
to our issue tracker at http://github.com/celery/kombu/issues/
Contributing
============
Development of `Kombu` happens at Github: http://github.com/celery/kombu
You are highly encouraged to participate in the development. If you don't
like Github (for some reason) you're welcome to send regular patches.
License
=======
This software is licensed under the `New BSD License`. See the `LICENSE`
file in the top distribution directory for the full license text.
.. image:: https://d2weczhvl823v0.cloudfront.net/celery/kombu/trend.png
:alt: Bitdeli badge
:target: https://bitdeli.com/free
Platform: any
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Programming Language :: Python :: Implementation :: Jython
Classifier: Intended Audience :: Developers
Classifier: Topic :: Communications
Classifier: Topic :: System :: Distributed Computing
Classifier: Topic :: System :: Networking
Classifier: Topic :: Software Development :: Libraries :: Python Modules

327
README.rst Normal file
View File

@ -0,0 +1,327 @@
.. _kombu-index:
========================================
kombu - Messaging library for Python
========================================
:Version: 3.0.21
`Kombu` is a messaging library for Python.
The aim of `Kombu` is to make messaging in Python as easy as possible by
providing an idiomatic high-level interface for the AMQ protocol, and also
provide proven and tested solutions to common messaging problems.
`AMQP`_ is the Advanced Message Queuing Protocol, an open standard protocol
for message orientation, queuing, routing, reliability and security,
for which the `RabbitMQ`_ messaging server is the most popular implementation.
Features
========
* Allows application authors to support several message server
solutions by using pluggable transports.
* AMQP transport using the `py-amqp`_ or `librabbitmq`_ client libraries.
* High performance AMQP transport written in C - when using `librabbitmq`_
This is automatically enabled if librabbitmq is installed::
$ pip install librabbitmq
* Virtual transports makes it really easy to add support for non-AMQP
transports. There is already built-in support for `Redis`_,
`Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_, `ZeroMQ`_,
`ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_.
* You can also use the SQLAlchemy and Django ORM transports to
use a database as the broker.
* In-memory transport for unit testing.
* Supports automatic encoding, serialization and compression of message
payloads.
* Consistent exception handling across transports.
* The ability to ensure that an operation is performed by gracefully
handling connection and channel errors.
* Several annoyances with `amqplib`_ has been fixed, like supporting
timeouts and the ability to wait for events on more than one channel.
* Projects already using `carrot`_ can easily be ported by using
a compatibility layer.
For an introduction to AMQP you should read the article `Rabbits and warrens`_,
and the `Wikipedia article about AMQP`_.
.. _`RabbitMQ`: http://www.rabbitmq.com/
.. _`AMQP`: http://amqp.org
.. _`py-amqp`: http://pypi.python.org/pypi/amqp/
.. _`Redis`: http://code.google.com/p/redis/
.. _`Amazon SQS`: http://aws.amazon.com/sqs/
.. _`MongoDB`: http://www.mongodb.org/
.. _`CouchDB`: http://couchdb.apache.org/
.. _`ZeroMQ`: http://zeromq.org/
.. _`Zookeeper`: https://zookeeper.apache.org/
.. _`Beanstalk`: http://kr.github.com/beanstalkd/
.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
.. _`amqplib`: http://barryp.org/software/py-amqplib/
.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
.. _`carrot`: http://pypi.python.org/pypi/carrot/
.. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq
.. _`Pyro`: http://pythonhosting.org/Pyro
.. _`SoftLayer MQ`: http://www.softlayer.com/services/additional/message-queue
.. _transport-comparison:
Transport Comparison
====================
+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
must be declared by all clients that needs them.
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
Documentation
-------------
Kombu is using Sphinx, and the latest documentation can be found here:
http://kombu.readthedocs.org/
Quick overview
--------------
::
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
Or handle channels manually::
with connection.channel() as channel:
producer = Producer(channel, ...)
consumer = Producer(channel)
All objects can be used outside of with statements too,
just remember to close the objects after use::
from kombu import Connection, Consumer, Producer
connection = Connection()
# ...
connection.release()
consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
# ....
consumer.cancel()
`Exchange` and `Queue` are simply declarations that can be pickled
and used in configuration files etc.
They also support operations, but to do so they need to be bound
to a channel.
Binding exchanges and queues to a connection will make it use
that connections default channel.
::
>>> exchange = Exchange('tasks', 'direct')
>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()
# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
a channel.
Installation
============
You can install `Kombu` either via the Python Package Index (PyPI)
or from source.
To install using `pip`,::
$ pip install kombu
To install using `easy_install`,::
$ easy_install kombu
If you have downloaded a source tarball you can install it
by doing the following,::
$ python setup.py build
# python setup.py install # as root
Terminology
===========
There are some concepts you should be familiar with before starting:
* Producers
Producers sends messages to an exchange.
* Exchanges
Messages are sent to exchanges. Exchanges are named and can be
configured to use one of several routing algorithms. The exchange
routes the messages to consumers by matching the routing key in the
message with the routing key the consumer provides when binding to
the exchange.
* Consumers
Consumers declares a queue, binds it to a exchange and receives
messages from it.
* Queues
Queues receive messages sent to exchanges. The queues are declared
by consumers.
* Routing keys
Every message has a routing key. The interpretation of the routing
key depends on the exchange type. There are four default exchange
types defined by the AMQP standard, and vendors can define custom
types (so see your vendors manual for details).
These are the default exchange types defined by AMQP/0.8:
* Direct exchange
Matches if the routing key property of the message and
the `routing_key` attribute of the consumer are identical.
* Fan-out exchange
Always matches, even if the binding does not have a routing
key.
* Topic exchange
Matches the routing key property of the message by a primitive
pattern matching scheme. The message routing key then consists
of words separated by dots (`"."`, like domain names), and
two special characters are available; star (`"*"`) and hash
(`"#"`). The star matches any word, and the hash matches
zero or more words. For example `"*.stock.#"` matches the
routing keys `"usd.stock"` and `"eur.stock.db"` but not
`"stock.nasdaq"`.
Getting Help
============
Mailing list
------------
Join the `carrot-users`_ mailing list.
.. _`carrot-users`: http://groups.google.com/group/carrot-users/
Bug tracker
===========
If you have any suggestions, bug reports or annoyances please report them
to our issue tracker at http://github.com/celery/kombu/issues/
Contributing
============
Development of `Kombu` happens at Github: http://github.com/celery/kombu
You are highly encouraged to participate in the development. If you don't
like Github (for some reason) you're welcome to send regular patches.
License
=======
This software is licensed under the `New BSD License`. See the `LICENSE`
file in the top distribution directory for the full license text.
.. image:: https://d2weczhvl823v0.cloudfront.net/celery/kombu/trend.png
:alt: Bitdeli badge
:target: https://bitdeli.com/free

32
THANKS Normal file
View File

@ -0,0 +1,32 @@
========
THANKS
========
From ``carrot`` THANKS file
===========================
* Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
* Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
* Thanks to Martin Mahner for the Sphinx theme.
* Thanks to jcater for bug reports.
* Thanks to sebest for bug reports.
* Thanks to greut for bug reports
From ``django-kombu`` THANKS file
=================================
* Thanks to Rajesh Dhawan and other authors of django-queue-service
for the database model implementation.
See http://code.google.com/p/django-queue-service/.
From ``kombu-sqlalchemy`` THANKS file
=====================================
* Thanks to Rajesh Dhawan and other authors of django-queue-service
for the database model implementation.
See http://code.google.com/p/django-queue-service/.
* Thanks to haridsv for the draft SQLAlchemy port (which can still
be found at http://github.com/haridsv/celery-alchemy-poc)

2
TODO Normal file
View File

@ -0,0 +1,2 @@
Please see our Issue Tracker at GitHub:
http://github.com/celery/kombu/issues

0
docs/.static/.keep Normal file
View File

View File

@ -0,0 +1,7 @@
<h3>Kombu</h3>
<p>
Kombu is a messaging library for Python.
</p>
<p class="logo"><a href="{{ pathto(master_doc) }}">
<img width="128" height="128" class="logo" src="http://cloud.github.com/downloads/celery/kombu/kombusmall.jpg" alt="Logo"/>
</a></p>

View File

@ -0,0 +1,3 @@
<p class="logo"><a href="{{ pathto(master_doc) }}">
<img class="logo" width="128" height="128" src="http://cloud.github.com/downloads/celery/kombu/kombusmall.jpg" alt="Logo"/>
</a></p>

75
docs/Makefile Normal file
View File

@ -0,0 +1,75 @@
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d .build/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html web pickle htmlhelp latex changes linkcheck
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " changes to make an overview over all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
clean:
-rm -rf .build/*
html:
mkdir -p .build/html .build/doctrees
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) .build/html
@echo
@echo "Build finished. The HTML pages are in .build/html."
pickle:
mkdir -p .build/pickle .build/doctrees
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) .build/pickle
@echo
@echo "Build finished; now you can process the pickle files."
web: pickle
json:
mkdir -p .build/json .build/doctrees
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) .build/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
mkdir -p .build/htmlhelp .build/doctrees
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) .build/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in .build/htmlhelp."
latex:
mkdir -p .build/latex .build/doctrees
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) .build/latex
@echo
@echo "Build finished; the LaTeX files are in .build/latex."
@echo "Run \`make all-pdf' or \`make all-ps' in that directory to" \
"run these through (pdf)latex."
changes:
mkdir -p .build/changes .build/doctrees
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) .build/changes
@echo
@echo "The overview file is in .build/changes."
linkcheck:
mkdir -p .build/linkcheck .build/doctrees
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) .build/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in .build/linkcheck/output.txt."

90
docs/_ext/applyxrefs.py Normal file
View File

@ -0,0 +1,90 @@
"""Adds xref targets to the top of files."""
import sys
import os
testing = False
DONT_TOUCH = ('./index.txt', )
def target_name(fn):
if fn.endswith('.txt'):
fn = fn[:-4]
return '_' + fn.lstrip('./').replace('/', '-')
def process_file(fn, lines):
lines.insert(0, '\n')
lines.insert(0, '.. %s:\n' % target_name(fn))
try:
f = open(fn, 'w')
except IOError:
print("Can't open %s for writing. Not touching it." % fn)
return
try:
f.writelines(lines)
except IOError:
print("Can't write to %s. Not touching it." % fn)
finally:
f.close()
def has_target(fn):
try:
f = open(fn, 'r')
except IOError:
print("Can't open %s. Not touching it." % fn)
return (True, None)
readok = True
try:
lines = f.readlines()
except IOError:
print("Can't read %s. Not touching it." % fn)
readok = False
finally:
f.close()
if not readok:
return (True, None)
#print fn, len(lines)
if len(lines) < 1:
print("Not touching empty file %s." % fn)
return (True, None)
if lines[0].startswith('.. _'):
return (True, None)
return (False, lines)
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) == 1:
argv.extend('.')
files = []
for root in argv[1:]:
for (dirpath, dirnames, filenames) in os.walk(root):
files.extend([(dirpath, f) for f in filenames])
files.sort()
files = [os.path.join(p, fn) for p, fn in files if fn.endswith('.txt')]
#print files
for fn in files:
if fn in DONT_TOUCH:
print("Skipping blacklisted file %s." % fn)
continue
target_found, lines = has_target(fn)
if not target_found:
if testing:
print '%s: %s' % (fn, lines[0]),
else:
print "Adding xref to %s" % fn
process_file(fn, lines)
else:
print "Skipping %s: already has a xref" % fn
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,180 @@
"""
Runs through a reST file looking for old-style literals, and helps replace them
with new-style references.
"""
import re
import sys
import shelve
try:
input = input
except NameError:
input = raw_input # noqa
refre = re.compile(r'``([^`\s]+?)``')
ROLES = (
'attr',
'class',
"djadmin",
'data',
'exc',
'file',
'func',
'lookup',
'meth',
'mod',
"djadminopt",
"ref",
"setting",
"term",
"tfilter",
"ttag",
# special
"skip",
)
ALWAYS_SKIP = [
"NULL",
"True",
"False",
]
def fixliterals(fname):
data = open(fname).read()
last = 0
new = []
storage = shelve.open("/tmp/literals_to_xref.shelve")
lastvalues = storage.get("lastvalues", {})
for m in refre.finditer(data):
new.append(data[last:m.start()])
last = m.end()
line_start = data.rfind("\n", 0, m.start())
line_end = data.find("\n", m.end())
prev_start = data.rfind("\n", 0, line_start)
next_end = data.find("\n", line_end + 1)
# Skip always-skip stuff
if m.group(1) in ALWAYS_SKIP:
new.append(m.group(0))
continue
# skip when the next line is a title
next_line = data[m.end():next_end].strip()
if next_line[0] in "!-/:-@[-`{-~" and \
all(c == next_line[0] for c in next_line):
new.append(m.group(0))
continue
sys.stdout.write("\n" + "-" * 80 + "\n")
sys.stdout.write(data[prev_start + 1:m.start()])
sys.stdout.write(colorize(m.group(0), fg="red"))
sys.stdout.write(data[m.end():next_end])
sys.stdout.write("\n\n")
replace_type = None
while replace_type is None:
replace_type = input(
colorize("Replace role: ", fg="yellow")).strip().lower()
if replace_type and replace_type not in ROLES:
replace_type = None
if replace_type == "":
new.append(m.group(0))
continue
if replace_type == "skip":
new.append(m.group(0))
ALWAYS_SKIP.append(m.group(1))
continue
default = lastvalues.get(m.group(1), m.group(1))
if default.endswith("()") and \
replace_type in ("class", "func", "meth"):
default = default[:-2]
replace_value = input(
colorize("Text <target> [", fg="yellow") +
default +
colorize("]: ", fg="yellow"),
).strip()
if not replace_value:
replace_value = default
new.append(":%s:`%s`" % (replace_type, replace_value))
lastvalues[m.group(1)] = replace_value
new.append(data[last:])
open(fname, "w").write("".join(new))
storage["lastvalues"] = lastvalues
storage.close()
def colorize(text='', opts=(), **kwargs):
"""
Returns your text, enclosed in ANSI graphics codes.
Depends on the keyword arguments 'fg' and 'bg', and the contents of
the opts tuple/list.
Returns the RESET code if no parameters are given.
Valid colors:
'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'
Valid options:
'bold'
'underscore'
'blink'
'reverse'
'conceal'
'noreset' - string will not be auto-terminated with the RESET code
Examples:
colorize('hello', fg='red', bg='blue', opts=('blink',))
colorize()
colorize('goodbye', opts=('underscore',))
print colorize('first line', fg='red', opts=('noreset',))
print 'this should be red too'
print colorize('and so should this')
print 'this should not be red'
"""
color_names = ('black', 'red', 'green', 'yellow',
'blue', 'magenta', 'cyan', 'white')
foreground = dict([(color_names[x], '3%s' % x) for x in range(8)])
background = dict([(color_names[x], '4%s' % x) for x in range(8)])
RESET = '0'
opt_dict = {'bold': '1',
'underscore': '4',
'blink': '5',
'reverse': '7',
'conceal': '8'}
text = str(text)
code_list = []
if text == '' and len(opts) == 1 and opts[0] == 'reset':
return '\x1b[%sm' % RESET
for k, v in kwargs.iteritems():
if k == 'fg':
code_list.append(foreground[v])
elif k == 'bg':
code_list.append(background[v])
for o in opts:
if o in opt_dict:
code_list.append(opt_dict[o])
if 'noreset' not in opts:
text = text + '\x1b[%sm' % RESET
return ('\x1b[%sm' % ';'.join(code_list)) + text
if __name__ == '__main__':
try:
fixliterals(sys.argv[1])
except (KeyboardInterrupt, SystemExit):
print

394
docs/_theme/celery/static/celery.css_t vendored Normal file
View File

@ -0,0 +1,394 @@
/*
* celery.css_t
* ~~~~~~~~~~~~
*
* :copyright: Copyright 2010 by Armin Ronacher.
* :license: BSD, see LICENSE for details.
*/
{% set page_width = 940 %}
{% set sidebar_width = 220 %}
{% set body_font_stack = 'Optima, Segoe, "Segoe UI", Candara, Calibri, Arial, sans-serif' %}
{% set headline_font_stack = 'Futura, "Trebuchet MS", Arial, sans-serif' %}
{% set code_font_stack = "'Consolas', 'Menlo', 'Deja Vu Sans Mono', 'Bitstream Vera Sans Mono', monospace" %}
@import url("basic.css");
/* -- page layout ----------------------------------------------------------- */
body {
align: left;
font-family: {{ body_font_stack }};
font-size: 17px;
background-color: white;
color: #000;
margin: 30px 0 0 0;
padding: 0;
}
div.document {
width: {{ page_width }}px;
margin: 0 auto;
}
div.related {
width: {{ page_width - 20 }}px;
padding: 5px 10px;
background: #F2FCEE;
margin: 15px auto 15px auto;
}
div.documentwrapper {
float: left;
width: 100%;
}
div.bodywrapper {
margin: 0 0 0 {{ sidebar_width }}px;
}
div.sphinxsidebar {
width: {{ sidebar_width }}px;
}
hr {
border: 1px solid #B1B4B6;
}
div.body {
background-color: #ffffff;
color: #3E4349;
padding: 0 30px 0 30px;
}
img.celerylogo {
padding: 0 0 10px 10px;
float: right;
}
div.footer {
width: {{ page_width - 15 }}px;
margin: 10px auto 30px auto;
padding-right: 15px;
font-size: 14px;
color: #888;
text-align: right;
}
div.footer a {
color: #888;
}
div.sphinxsidebar a {
color: #444;
text-decoration: none;
border-bottom: 1px dashed #DCF0D5;
}
div.sphinxsidebar a:hover {
border-bottom: 1px solid #999;
}
div.sphinxsidebar {
font-size: 14px;
line-height: 1.5;
}
div.sphinxsidebarwrapper {
padding: 7px 10px;
}
div.sphinxsidebarwrapper p.logo {
padding: 0 0 20px 0;
margin: 0;
}
div.sphinxsidebar h3,
div.sphinxsidebar h4 {
font-family: {{ headline_font_stack }};
color: #444;
font-size: 24px;
font-weight: normal;
margin: 0 0 5px 0;
padding: 0;
}
div.sphinxsidebar h4 {
font-size: 20px;
}
div.sphinxsidebar h3 a {
color: #444;
}
div.sphinxsidebar p.logo a,
div.sphinxsidebar h3 a,
div.sphinxsidebar p.logo a:hover,
div.sphinxsidebar h3 a:hover {
border: none;
}
div.sphinxsidebar p {
color: #555;
margin: 10px 0;
}
div.sphinxsidebar ul {
margin: 10px 0;
padding: 0;
color: #000;
}
div.sphinxsidebar input {
border: 1px solid #ccc;
font-family: {{ body_font_stack }};
font-size: 1em;
}
/* -- body styles ----------------------------------------------------------- */
a {
color: #348613;
text-decoration: underline;
}
a:hover {
color: #59B833;
text-decoration: underline;
}
div.body h1,
div.body h2,
div.body h3,
div.body h4,
div.body h5,
div.body h6 {
font-family: {{ headline_font_stack }};
font-weight: normal;
margin: 30px 0px 10px 0px;
padding: 0;
}
div.body h1 { margin-top: 0; padding-top: 0; font-size: 200%; }
div.body h2 { font-size: 180%; }
div.body h3 { font-size: 150%; }
div.body h4 { font-size: 130%; }
div.body h5 { font-size: 100%; }
div.body h6 { font-size: 100%; }
div.body h1 a.toc-backref,
div.body h2 a.toc-backref,
div.body h3 a.toc-backref,
div.body h4 a.toc-backref,
div.body h5 a.toc-backref,
div.body h6 a.toc-backref {
color: inherit!important;
text-decoration: none;
}
a.headerlink {
color: #ddd;
padding: 0 4px;
text-decoration: none;
}
a.headerlink:hover {
color: #444;
background: #eaeaea;
}
div.body p, div.body dd, div.body li {
line-height: 1.4em;
}
div.admonition {
background: #fafafa;
margin: 20px -30px;
padding: 10px 30px;
border-top: 1px solid #ccc;
border-bottom: 1px solid #ccc;
}
div.admonition p.admonition-title {
font-family: {{ headline_font_stack }};
font-weight: normal;
font-size: 24px;
margin: 0 0 10px 0;
padding: 0;
line-height: 1;
}
div.admonition p.last {
margin-bottom: 0;
}
div.highlight{
background-color: white;
}
dt:target, .highlight {
background: #FAF3E8;
}
div.note {
background-color: #eee;
border: 1px solid #ccc;
}
div.seealso {
background-color: #ffc;
border: 1px solid #ff6;
}
div.topic {
background-color: #eee;
}
div.warning {
background-color: #ffe4e4;
border: 1px solid #f66;
}
p.admonition-title {
display: inline;
}
p.admonition-title:after {
content: ":";
}
pre, tt {
font-family: {{ code_font_stack }};
font-size: 0.9em;
}
img.screenshot {
}
tt.descname, tt.descclassname {
font-size: 0.95em;
}
tt.descname {
padding-right: 0.08em;
}
img.screenshot {
-moz-box-shadow: 2px 2px 4px #eee;
-webkit-box-shadow: 2px 2px 4px #eee;
box-shadow: 2px 2px 4px #eee;
}
table.docutils {
border: 1px solid #888;
-moz-box-shadow: 2px 2px 4px #eee;
-webkit-box-shadow: 2px 2px 4px #eee;
box-shadow: 2px 2px 4px #eee;
}
table.docutils td, table.docutils th {
border: 1px solid #888;
padding: 0.25em 0.7em;
}
table.field-list, table.footnote {
border: none;
-moz-box-shadow: none;
-webkit-box-shadow: none;
box-shadow: none;
}
table.footnote {
margin: 15px 0;
width: 100%;
border: 1px solid #eee;
background: #fdfdfd;
font-size: 0.9em;
}
table.footnote + table.footnote {
margin-top: -15px;
border-top: none;
}
table.field-list th {
padding: 0 0.8em 0 0;
}
table.field-list td {
padding: 0;
}
table.footnote td.label {
width: 0px;
padding: 0.3em 0 0.3em 0.5em;
}
table.footnote td {
padding: 0.3em 0.5em;
}
dl {
margin: 0;
padding: 0;
}
dl dd {
margin-left: 30px;
}
blockquote {
margin: 0 0 0 30px;
padding: 0;
}
ul {
margin: 10px 0 10px 30px;
padding: 0;
}
pre {
background: #F0FFEB;
padding: 7px 10px;
margin: 15px 0;
border: 1px solid #C7ECB8;
border-radius: 2px;
-moz-border-radius: 2px;
-webkit-border-radius: 2px;
line-height: 1.3em;
}
tt {
background: #F0FFEB;
color: #222;
/* padding: 1px 2px; */
}
tt.xref, a tt {
background: #F0FFEB;
border-bottom: 1px solid white;
}
a.reference {
text-decoration: none;
border-bottom: 1px dashed #DCF0D5;
}
a.reference:hover {
border-bottom: 1px solid #6D4100;
}
a.footnote-reference {
text-decoration: none;
font-size: 0.7em;
vertical-align: top;
border-bottom: 1px dashed #DCF0D5;
}
a.footnote-reference:hover {
border-bottom: 1px solid #6D4100;
}
a:hover tt {
background: #EEE;
}

5
docs/_theme/celery/theme.conf vendored Normal file
View File

@ -0,0 +1,5 @@
[theme]
inherit = basic
stylesheet = celery.css
[options]

3012
docs/changelog.rst Normal file

File diff suppressed because it is too large Load Diff

75
docs/conf.py Normal file
View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
import sys
import os
# If your extensions are in another directory, add it here. If the directory
# is relative to the documentation root, use os.path.abspath to make it
# absolute, like shown here.
sys.path.append(os.path.join(os.pardir, "tests"))
import kombu
from django.conf import settings
if not settings.configured:
settings.configure()
# General configuration
# ---------------------
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['.templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = 'Kombu'
copyright = '2009-2014, Ask Solem'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = ".".join(map(str, kombu.VERSION[0:2]))
# The full version, including alpha/beta/rc tags.
release = kombu.__version__
exclude_trees = ['.build']
# If true, '()' will be appended to :func: etc. cross-reference text.
add_function_parentheses = True
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'colorful'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['.static']
html_use_smartypants = True
# If false, no module index is generated.
html_use_modindex = True
# If false, no index is generated.
html_use_index = True
latex_documents = [
('index', 'Kombu.tex', 'Kombu Documentation',
'Ask Solem', 'manual'),
]
html_theme = "celery"
html_theme_path = ["_theme"]
html_sidebars = {
'index': ['sidebarintro.html', 'sourcelink.html', 'searchbox.html'],
'**': ['sidebarlogo.html', 'localtoc.html', 'relations.html',
'sourcelink.html', 'searchbox.html'],
}

16
docs/faq.rst Normal file
View File

@ -0,0 +1,16 @@
============================
Frequently Asked Questions
============================
Questions
=========
Q: Message.reject doesn't work?
--------------------------------------
**Answer**: Earlier versions of RabbitMQ did not implement ``basic.reject``,
so make sure your version is recent enough to support it.
Q: Message.requeue doesn't work?
--------------------------------------
**Answer**: See _`Message.reject doesn't work?`

BIN
docs/images/kombu.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

BIN
docs/images/kombusmall.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

26
docs/index.rst Normal file
View File

@ -0,0 +1,26 @@
Kombu Documentation
==================================
Contents:
.. toctree::
:maxdepth: 2
introduction
userguide/index
.. toctree::
:maxdepth: 1
faq
reference/index
changelog
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

327
docs/introduction.rst Normal file
View File

@ -0,0 +1,327 @@
.. _kombu-index:
========================================
kombu - Messaging library for Python
========================================
:Version: 3.0.21
`Kombu` is a messaging library for Python.
The aim of `Kombu` is to make messaging in Python as easy as possible by
providing an idiomatic high-level interface for the AMQ protocol, and also
provide proven and tested solutions to common messaging problems.
`AMQP`_ is the Advanced Message Queuing Protocol, an open standard protocol
for message orientation, queuing, routing, reliability and security,
for which the `RabbitMQ`_ messaging server is the most popular implementation.
Features
========
* Allows application authors to support several message server
solutions by using pluggable transports.
* AMQP transport using the `py-amqp`_ or `librabbitmq`_ client libraries.
* High performance AMQP transport written in C - when using `librabbitmq`_
This is automatically enabled if librabbitmq is installed::
$ pip install librabbitmq
* Virtual transports makes it really easy to add support for non-AMQP
transports. There is already built-in support for `Redis`_,
`Beanstalk`_, `Amazon SQS`_, `CouchDB`_, `MongoDB`_, `ZeroMQ`_,
`ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_.
* You can also use the SQLAlchemy and Django ORM transports to
use a database as the broker.
* In-memory transport for unit testing.
* Supports automatic encoding, serialization and compression of message
payloads.
* Consistent exception handling across transports.
* The ability to ensure that an operation is performed by gracefully
handling connection and channel errors.
* Several annoyances with `amqplib`_ has been fixed, like supporting
timeouts and the ability to wait for events on more than one channel.
* Projects already using `carrot`_ can easily be ported by using
a compatibility layer.
For an introduction to AMQP you should read the article `Rabbits and warrens`_,
and the `Wikipedia article about AMQP`_.
.. _`RabbitMQ`: http://www.rabbitmq.com/
.. _`AMQP`: http://amqp.org
.. _`py-amqp`: http://pypi.python.org/pypi/amqp/
.. _`Redis`: http://code.google.com/p/redis/
.. _`Amazon SQS`: http://aws.amazon.com/sqs/
.. _`MongoDB`: http://www.mongodb.org/
.. _`CouchDB`: http://couchdb.apache.org/
.. _`ZeroMQ`: http://zeromq.org/
.. _`Zookeeper`: https://zookeeper.apache.org/
.. _`Beanstalk`: http://kr.github.com/beanstalkd/
.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
.. _`amqplib`: http://barryp.org/software/py-amqplib/
.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
.. _`carrot`: http://pypi.python.org/pypi/carrot/
.. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq
.. _`Pyro`: http://pythonhosting.org/Pyro
.. _`SoftLayer MQ`: http://www.softlayer.com/services/additional/message-queue
.. _transport-comparison:
Transport Comparison
====================
+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
must be declared by all clients that needs them.
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
Documentation
-------------
Kombu is using Sphinx, and the latest documentation can be found here:
http://kombu.readthedocs.org/
Quick overview
--------------
::
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
Or handle channels manually::
with connection.channel() as channel:
producer = Producer(channel, ...)
consumer = Producer(channel)
All objects can be used outside of with statements too,
just remember to close the objects after use::
from kombu import Connection, Consumer, Producer
connection = Connection()
# ...
connection.release()
consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
# ....
consumer.cancel()
`Exchange` and `Queue` are simply declarations that can be pickled
and used in configuration files etc.
They also support operations, but to do so they need to be bound
to a channel.
Binding exchanges and queues to a connection will make it use
that connections default channel.
::
>>> exchange = Exchange('tasks', 'direct')
>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()
# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
a channel.
Installation
============
You can install `Kombu` either via the Python Package Index (PyPI)
or from source.
To install using `pip`,::
$ pip install kombu
To install using `easy_install`,::
$ easy_install kombu
If you have downloaded a source tarball you can install it
by doing the following,::
$ python setup.py build
# python setup.py install # as root
Terminology
===========
There are some concepts you should be familiar with before starting:
* Producers
Producers sends messages to an exchange.
* Exchanges
Messages are sent to exchanges. Exchanges are named and can be
configured to use one of several routing algorithms. The exchange
routes the messages to consumers by matching the routing key in the
message with the routing key the consumer provides when binding to
the exchange.
* Consumers
Consumers declares a queue, binds it to a exchange and receives
messages from it.
* Queues
Queues receive messages sent to exchanges. The queues are declared
by consumers.
* Routing keys
Every message has a routing key. The interpretation of the routing
key depends on the exchange type. There are four default exchange
types defined by the AMQP standard, and vendors can define custom
types (so see your vendors manual for details).
These are the default exchange types defined by AMQP/0.8:
* Direct exchange
Matches if the routing key property of the message and
the `routing_key` attribute of the consumer are identical.
* Fan-out exchange
Always matches, even if the binding does not have a routing
key.
* Topic exchange
Matches the routing key property of the message by a primitive
pattern matching scheme. The message routing key then consists
of words separated by dots (`"."`, like domain names), and
two special characters are available; star (`"*"`) and hash
(`"#"`). The star matches any word, and the hash matches
zero or more words. For example `"*.stock.#"` matches the
routing keys `"usd.stock"` and `"eur.stock.db"` but not
`"stock.nasdaq"`.
Getting Help
============
Mailing list
------------
Join the `carrot-users`_ mailing list.
.. _`carrot-users`: http://groups.google.com/group/carrot-users/
Bug tracker
===========
If you have any suggestions, bug reports or annoyances please report them
to our issue tracker at http://github.com/celery/kombu/issues/
Contributing
============
Development of `Kombu` happens at Github: http://github.com/celery/kombu
You are highly encouraged to participate in the development. If you don't
like Github (for some reason) you're welcome to send regular patches.
License
=======
This software is licensed under the `New BSD License`. See the `LICENSE`
file in the top distribution directory for the full license text.
.. image:: https://d2weczhvl823v0.cloudfront.net/celery/kombu/trend.png
:alt: Bitdeli badge
:target: https://bitdeli.com/free

67
docs/reference/index.rst Normal file
View File

@ -0,0 +1,67 @@
===========================
API Reference
===========================
:Release: |version|
:Date: |today|
.. toctree::
:maxdepth: 2
kombu
kombu.common
kombu.mixins
kombu.simple
kombu.clocks
kombu.compat
kombu.pidbox
kombu.exceptions
kombu.log
kombu.connection
kombu.message
kombu.compression
kombu.pools
kombu.abstract
kombu.syn
kombu.async
kombu.async.hub
kombu.async.semaphore
kombu.async.timer
kombu.async.debug
kombu.transport
kombu.transport.pyamqp
kombu.transport.librabbitmq
kombu.transport.memory
kombu.transport.redis
kombu.transport.zmq
kombu.transport.beanstalk
kombu.transport.mongodb
kombu.transport.couchdb
kombu.transport.zookeeper
kombu.transport.filesystem
kombu.transport.django
kombu.transport.django.models
kombu.transport.django.managers
kombu.transport.django.management.commands.clean_kombu_messages
kombu.transport.sqlalchemy
kombu.transport.sqlalchemy.models
kombu.transport.SQS
kombu.transport.SLMQ
kombu.transport.pyro
kombu.transport.amqplib
kombu.transport.base
kombu.transport.virtual
kombu.transport.virtual.exchange
kombu.transport.virtual.scheduling
kombu.serialization
kombu.utils
kombu.utils.eventio
kombu.utils.limits
kombu.utils.compat
kombu.utils.debug
kombu.utils.encoding
kombu.utils.functional
kombu.utils.url
kombu.utils.text
kombu.utils.amq_manager
kombu.five

View File

@ -0,0 +1,10 @@
.. currentmodule:: kombu.abstract
.. automodule:: kombu.abstract
.. contents::
:local:
.. autoclass:: MaybeChannelBound
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Debugging Utils - kombu.async.debug
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.async.debug
.. automodule:: kombu.async.debug
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Event Loop Implementation - kombu.async.hub
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.async.hub
.. automodule:: kombu.async.hub
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Event Loop - kombu.async
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.async
.. automodule:: kombu.async
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Semaphores - kombu.async.semaphore
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.async.semaphore
.. automodule:: kombu.async.semaphore
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Timer - kombu.async.timer
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.async.timer
.. automodule:: kombu.async.timer
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Clocks and Synchronization - kombu.clocks
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.clocks
.. automodule:: kombu.clocks
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Common Utilities - kombu.common
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.common
.. automodule:: kombu.common
:members:
:undoc-members:

View File

@ -0,0 +1,36 @@
.. currentmodule:: kombu.compat
.. automodule:: kombu.compat
.. contents::
:local:
Publisher
---------
Replace with :class:`kombu.Producer`.
.. autoclass:: Publisher
:members:
:undoc-members:
:inherited-members:
Consumer
--------
Replace with :class:`kombu.Consumer`.
.. autoclass:: Consumer
:members:
:undoc-members:
:inherited-members:
ConsumerSet
-----------
Replace with :class:`kombu.Consumer`.
.. autoclass:: ConsumerSet
:members:
:undoc-members:
:inherited-members:

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.compression
.. automodule:: kombu.compression
.. contents::
:local:
Encoding/decoding
-----------------
.. autofunction:: compress
.. autofunction:: decompress
Registry
--------
.. autofunction:: encoders
.. autofunction:: get_encoder
.. autofunction:: get_decoder
.. autofunction:: register

View File

@ -0,0 +1,40 @@
.. currentmodule:: kombu.connection
.. automodule:: kombu.connection
.. contents::
:local:
Connection
----------
.. autoclass:: Connection
:members:
:undoc-members:
Pools
-----
.. seealso::
The shortcut methods :meth:`Connection.Pool` and
:meth:`Connection.ChannelPool` is the recommended way
to instantiate these classes.
.. autoclass:: ConnectionPool
.. autoattribute:: LimitExceeded
.. automethod:: acquire
.. automethod:: release
.. automethod:: force_close_all
.. autoclass:: ChannelPool
.. autoattribute:: LimitExceeded
.. automethod:: acquire
.. automethod:: release
.. automethod:: force_close_all

View File

@ -0,0 +1,14 @@
.. currentmodule:: kombu.exceptions
.. automodule:: kombu.exceptions
.. contents::
:local:
.. autoexception:: NotBoundError
.. autoexception:: MessageStateError
.. autoexception:: TimeoutError
.. autoexception:: LimitExceeded
.. autoexception:: ConnectionLimitExceeded
.. autoexception:: ChannelLimitExceeded

View File

@ -0,0 +1,11 @@
==========================================================
Python2 to Python3 utilities - kombu.five
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.five
.. automodule:: kombu.five
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Logging - kombu.log
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.log
.. automodule:: kombu.log
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Message Objects - kombu.message
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.message
.. automodule:: kombu.message
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Mixin Classes - kombu.mixins
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.mixins
.. automodule:: kombu.mixins
:members:
:undoc-members:

View File

@ -0,0 +1,89 @@
.. currentmodule:: kombu.pidbox
.. automodule:: kombu.pidbox
.. contents::
:local:
Introduction
------------
Creating the applications Mailbox
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
>>> mailbox = pidbox.Mailbox("celerybeat", type="direct")
>>> @mailbox.handler
>>> def reload_schedule(state, **kwargs):
... state["beat"].reload_schedule()
>>> @mailbox.handler
>>> def connection_info(state, **kwargs):
... return {"connection": state["connection"].info()}
Example Node
~~~~~~~~~~~~
.. code-block:: python
>>> connection = kombu.Connection()
>>> state = {"beat": beat,
"connection": connection}
>>> consumer = mailbox(connection).Node(hostname).listen()
>>> try:
... while True:
... connection.drain_events(timeout=1)
... finally:
... consumer.cancel()
Example Client
~~~~~~~~~~~~~~
.. code-block:: python
>>> mailbox.cast("reload_schedule") # cast is async.
>>> info = celerybeat.call("connection_info", timeout=1)
Mailbox
-------
.. autoclass:: Mailbox
.. autoattribute:: namespace
.. autoattribute:: connection
.. autoattribute:: type
.. autoattribute:: exchange
.. autoattribute:: reply_exchange
.. automethod:: Node
.. automethod:: call
.. automethod:: cast
.. automethod:: abcast
.. automethod:: multi_call
.. automethod:: get_reply_queue
.. automethod:: get_queue
Node
----
.. autoclass:: Node
.. autoattribute:: hostname
.. autoattribute:: mailbox
.. autoattribute:: handlers
.. autoattribute:: state
.. autoattribute:: channel
.. automethod:: Consumer
.. automethod:: handler
.. automethod:: listen
.. automethod:: dispatch
.. automethod:: dispatch_from_message
.. automethod:: handle_call
.. automethod:: handle_cast
.. automethod:: handle
.. automethod:: handle_message
.. automethod:: reply

View File

@ -0,0 +1,11 @@
==========================================================
General Pools - kombu.pools
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.pools
.. automodule:: kombu.pools
:members:
:undoc-members:

187
docs/reference/kombu.rst Normal file
View File

@ -0,0 +1,187 @@
.. currentmodule:: kombu
.. contents::
:local:
.. automodule:: kombu
.. autofunction:: enable_insecure_serializers
.. autofunction:: disable_insecure_serializers
Connection
----------
.. autoclass:: Connection
.. admonition:: Attributes
.. autoattribute:: hostname
.. autoattribute:: port
.. autoattribute:: userid
.. autoattribute:: password
.. autoattribute:: virtual_host
.. autoattribute:: ssl
.. autoattribute:: login_method
.. autoattribute:: failover_strategy
.. autoattribute:: connect_timeout
.. autoattribute:: heartbeat
.. autoattribute:: default_channel
.. autoattribute:: connected
.. autoattribute:: recoverable_connection_errors
.. autoattribute:: recoverable_channel_errors
.. autoattribute:: connection_errors
.. autoattribute:: channel_errors
.. autoattribute:: transport
.. autoattribute:: connection
.. autoattribute:: uri_prefix
.. autoattribute:: declared_entities
.. autoattribute:: cycle
.. autoattribute:: host
.. autoattribute:: manager
.. autoattribute:: supports_heartbeats
.. autoattribute:: is_evented
.. admonition:: Methods
.. automethod:: as_uri
.. automethod:: connect
.. automethod:: channel
.. automethod:: drain_events
.. automethod:: release
.. automethod:: autoretry
.. automethod:: ensure_connection
.. automethod:: ensure
.. automethod:: revive
.. automethod:: create_transport
.. automethod:: get_transport_cls
.. automethod:: clone
.. automethod:: info
.. automethod:: switch
.. automethod:: maybe_switch_next
.. automethod:: heartbeat_check
.. automethod:: maybe_close_channel
.. automethod:: register_with_event_loop
.. automethod:: close
.. automethod:: _close
.. automethod:: completes_cycle
.. automethod:: get_manager
.. automethod:: Producer
.. automethod:: Consumer
.. automethod:: Pool
.. automethod:: ChannelPool
.. automethod:: SimpleQueue
.. automethod:: SimpleBuffer
Exchange
--------
Example creating an exchange declaration::
>>> news_exchange = Exchange('news', type='topic')
For now `news_exchange` is just a declaration, you can't perform
actions on it. It just describes the name and options for the exchange.
The exchange can be bound or unbound. Bound means the exchange is
associated with a channel and operations can be performed on it.
To bind the exchange you call the exchange with the channel as argument::
>>> bound_exchange = news_exchange(channel)
Now you can perform operations like :meth:`declare` or :meth:`delete`::
>>> bound_exchange.declare()
>>> message = bound_exchange.Message('Cure for cancer found!')
>>> bound_exchange.publish(message, routing_key='news.science')
>>> bound_exchange.delete()
.. autoclass:: Exchange
:members:
:undoc-members:
.. automethod:: maybe_bind
Queue
-----
Example creating a queue using our exchange in the :class:`Exchange`
example::
>>> science_news = Queue('science_news',
... exchange=news_exchange,
... routing_key='news.science')
For now `science_news` is just a declaration, you can't perform
actions on it. It just describes the name and options for the queue.
The queue can be bound or unbound. Bound means the queue is
associated with a channel and operations can be performed on it.
To bind the queue you call the queue instance with the channel as
an argument::
>>> bound_science_news = science_news(channel)
Now you can perform operations like :meth:`declare` or :meth:`purge`:
.. code-block:: python
>>> bound_science_news.declare()
>>> bound_science_news.purge()
>>> bound_science_news.delete()
.. autoclass:: Queue
:members:
:undoc-members:
.. automethod:: maybe_bind
Message Producer
----------------
.. autoclass:: Producer
.. autoattribute:: channel
.. autoattribute:: exchange
.. autoattribute:: routing_key
.. autoattribute:: serializer
.. autoattribute:: compression
.. autoattribute:: auto_declare
.. autoattribute:: on_return
.. autoattribute:: connection
.. automethod:: declare
.. automethod:: maybe_declare
.. automethod:: publish
.. automethod:: revive
Message Consumer
----------------
.. autoclass:: Consumer
.. autoattribute:: channel
.. autoattribute:: queues
.. autoattribute:: no_ack
.. autoattribute:: auto_declare
.. autoattribute:: callbacks
.. autoattribute:: on_message
.. autoattribute:: on_decode_error
.. autoattribute:: connection
.. automethod:: declare
.. automethod:: register_callback
.. automethod:: add_queue
.. automethod:: add_queue_from_dict
.. automethod:: consume
.. automethod:: cancel
.. automethod:: cancel_by_queue
.. automethod:: consuming_from
.. automethod:: purge
.. automethod:: flow
.. automethod:: qos
.. automethod:: recover
.. automethod:: receive
.. automethod:: revive

View File

@ -0,0 +1,47 @@
.. currentmodule:: kombu.serialization
.. automodule:: kombu.serialization
.. contents::
:local:
Overview
--------
Centralized support for encoding/decoding of data structures.
Contains json, pickle, msgpack, and yaml serializers.
Optionally installs support for YAML if the `PyYAML`_ package
is installed.
Optionally installs support for `msgpack`_ if the `msgpack-python`_
package is installed.
Exceptions
----------
.. autoexception:: SerializerNotInstalled
Serialization
-------------
.. autofunction:: encode
.. autofunction:: decode
.. autofunction:: raw_encode
Registry
--------
.. autofunction:: register
.. autodata:: registry
.. _`cjson`: http://pypi.python.org/pypi/python-cjson/
.. _`simplejson`: http://code.google.com/p/simplejson/
.. _`Python 2.6+`: http://docs.python.org/library/json.html
.. _`PyYAML`: http://pyyaml.org/
.. _`msgpack`: http://msgpack.sourceforge.net/
.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/

View File

@ -0,0 +1,89 @@
.. currentmodule:: kombu.simple
.. automodule:: kombu.simple
.. contents::
:local:
Persistent
----------
.. autoclass:: SimpleQueue
.. attribute:: channel
Current channel
.. attribute:: producer
:class:`~kombu.Producer` used to publish messages.
.. attribute:: consumer
:class:`~kombu.Consumer` used to receive messages.
.. attribute:: no_ack
flag to enable/disable acknowledgements.
.. attribute:: queue
:class:`~kombu.Queue` to consume from (if consuming).
.. attribute:: queue_opts
Additional options for the queue declaration.
.. attribute:: exchange_opts
Additional options for the exchange declaration.
.. automethod:: get
.. automethod:: get_nowait
.. automethod:: put
.. automethod:: clear
.. automethod:: __len__
.. automethod:: qsize
.. automethod:: close
Buffer
------
.. autoclass:: SimpleBuffer
.. attribute:: channel
Current channel
.. attribute:: producer
:class:`~kombu.Producer` used to publish messages.
.. attribute:: consumer
:class:`~kombu.Consumer` used to receive messages.
.. attribute:: no_ack
flag to enable/disable acknowledgements.
.. attribute:: queue
:class:`~kombu.Queue` to consume from (if consuming).
.. attribute:: queue_opts
Additional options for the queue declaration.
.. attribute:: exchange_opts
Additional options for the exchange declaration.
.. automethod:: get
.. automethod:: get_nowait
.. automethod:: put
.. automethod:: clear
.. automethod:: __len__
.. automethod:: qsize
.. automethod:: close

View File

@ -0,0 +1,11 @@
==========================================================
Async Utilities - kombu.syn
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.syn
.. automodule:: kombu.syn
.. autofunction:: detect_environment

View File

@ -0,0 +1,24 @@
======================================
kombu.transport.SLMQ
======================================
.. currentmodule:: kombu.transport.SLMQ
.. automodule:: kombu.transport.SLMQ
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.transport.SQS
.. automodule:: kombu.transport.SQS
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,36 @@
.. currentmodule:: kombu.transport.amqplib
.. automodule:: kombu.transport.amqplib
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Connection
----------
.. autoclass:: Connection
:members:
:undoc-members:
:inherited-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:
Message
-------
.. autoclass:: Message
:members:
:undoc-members:

View File

@ -0,0 +1,62 @@
.. currentmodule:: kombu.transport.base
.. automodule:: kombu.transport.base
.. contents::
:local:
Message
-------
.. autoclass:: Message
.. autoattribute:: payload
.. autoattribute:: channel
.. autoattribute:: delivery_tag
.. autoattribute:: content_type
.. autoattribute:: content_encoding
.. autoattribute:: delivery_info
.. autoattribute:: headers
.. autoattribute:: properties
.. autoattribute:: body
.. autoattribute:: acknowledged
.. automethod:: ack
.. automethod:: reject
.. automethod:: requeue
.. automethod:: decode
Transport
---------
.. autoclass:: Transport
.. autoattribute:: client
.. autoattribute:: default_port
.. attribute:: recoverable_connection_errors
Optional list of connection related exceptions that can be
recovered from, but where the connection must be closed
and re-established first.
If not defined then all :attr:`connection_errors` and
:class:`channel_errors` will be regarded as recoverable,
but needing to close the connection first.
.. attribute:: recoverable_channel_errors
Optional list of channel related exceptions that can be
automatically recovered from without re-establishing the
connection.
.. autoattribute:: connection_errors
.. autoattribute:: channel_errors
.. automethod:: establish_connection
.. automethod:: close_connection
.. automethod:: create_channel
.. automethod:: close_channel
.. automethod:: drain_events

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.transport.beanstalk
.. automodule:: kombu.transport.beanstalk
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,25 @@
.. currentmodule:: kombu.transport.couchdb
.. automodule:: kombu.transport.couchdb
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:
Functions
---------
.. autofunction:: create_message_view

View File

@ -0,0 +1,14 @@
==========================================================
Django Management - clean_kombu_messages
==========================================================
.. contents::
:local:
.. currentmodule::
kombu.transport.django.management.commands.clean_kombu_messages
.. automodule::
kombu.transport.django.management.commands.clean_kombu_messages
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Django Managers - kombu.transport.django.managers
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.transport.django.managers
.. automodule:: kombu.transport.django.managers
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Django Models - kombu.transport.django.models
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.transport.django.models
.. automodule:: kombu.transport.django.models
:members:
:undoc-members:

View File

@ -0,0 +1,24 @@
=========================================
kombu.transport.django
=========================================
.. currentmodule:: kombu.transport.django
.. automodule:: kombu.transport.django
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,21 @@
.. currentmodule:: kombu.transport.filesystem
.. automodule:: kombu.transport.filesystem
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,35 @@
.. currentmodule:: kombu.transport.librabbitmq
.. automodule:: kombu.transport.librabbitmq
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Connection
----------
.. autoclass:: Connection
:members:
:undoc-members:
:inherited-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:
Message
-------
.. autoclass:: Message
:members:
:undoc-members:

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.transport.memory
.. automodule:: kombu.transport.memory
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.transport.mongodb
.. automodule:: kombu.transport.mongodb
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,36 @@
.. currentmodule:: kombu.transport.pyamqp
.. automodule:: kombu.transport.pyamqp
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Connection
----------
.. autoclass:: Connection
:members:
:undoc-members:
:inherited-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:
Message
-------
.. autoclass:: Message
:members:
:undoc-members:

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.transport.pyro
.. automodule:: kombu.transport.pyro
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,20 @@
.. currentmodule:: kombu.transport.redis
.. automodule:: kombu.transport.redis
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,23 @@
.. currentmodule:: kombu.transport
.. automodule:: kombu.transport
.. contents::
:local:
Data
----
.. data:: DEFAULT_TRANSPORT
Default transport used when no transport specified.
.. data:: TRANSPORT_ALIASES
Mapping of transport aliases/class names.
Functions
---------
.. autofunction:: get_transport_cls
.. autofunction:: resolve_transport

View File

@ -0,0 +1,27 @@
.. currentmodule:: kombu.transport.sqlalchemy.models
.. automodule:: kombu.transport.sqlalchemy.models
.. contents::
:local:
Models
------
.. autoclass:: Queue
.. autoattribute:: Queue.id
.. autoattribute:: Queue.name
.. autoclass:: Message
.. autoattribute:: Message.id
.. autoattribute:: Message.visible
.. autoattribute:: Message.sent_at
.. autoattribute:: Message.payload
.. autoattribute:: Message.version

View File

@ -0,0 +1,25 @@
====================================
kombu.transport.sqlalchemy
====================================
.. currentmodule:: kombu.transport.sqlalchemy
.. automodule:: kombu.transport.sqlalchemy
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,35 @@
.. currentmodule:: kombu.transport.virtual.exchange
.. automodule:: kombu.transport.virtual.exchange
.. contents::
:local:
Direct
------
.. autoclass:: DirectExchange
:members:
:undoc-members:
Topic
-----
.. autoclass:: TopicExchange
:members:
:undoc-members:
Fanout
------
.. autoclass:: FanoutExchange
:members:
:undoc-members:
Interface
---------
.. autoclass:: ExchangeType
:members:
:undoc-members:

View File

@ -0,0 +1,117 @@
.. currentmodule:: kombu.transport.virtual
.. automodule:: kombu.transport.virtual
.. contents::
:local:
Transports
----------
.. autoclass:: Transport
.. autoattribute:: Channel
.. autoattribute:: Cycle
.. autoattribute:: polling_interval
.. autoattribute:: default_port
.. autoattribute:: state
.. autoattribute:: cycle
.. automethod:: establish_connection
.. automethod:: close_connection
.. automethod:: create_channel
.. automethod:: close_channel
.. automethod:: drain_events
Channel
-------
.. autoclass:: AbstractChannel
:members:
.. autoclass:: Channel
.. autoattribute:: Message
.. autoattribute:: state
.. autoattribute:: qos
.. autoattribute:: do_restore
.. autoattribute:: exchange_types
.. automethod:: exchange_declare
.. automethod:: exchange_delete
.. automethod:: queue_declare
.. automethod:: queue_delete
.. automethod:: queue_bind
.. automethod:: queue_purge
.. automethod:: basic_publish
.. automethod:: basic_consume
.. automethod:: basic_cancel
.. automethod:: basic_get
.. automethod:: basic_ack
.. automethod:: basic_recover
.. automethod:: basic_reject
.. automethod:: basic_qos
.. automethod:: get_table
.. automethod:: typeof
.. automethod:: drain_events
.. automethod:: prepare_message
.. automethod:: message_to_python
.. automethod:: flow
.. automethod:: close
Message
-------
.. autoclass:: Message
:members:
:undoc-members:
:inherited-members:
Quality Of Service
------------------
.. autoclass:: QoS
:members:
:undoc-members:
:inherited-members:
In-memory State
---------------
.. autoclass:: BrokerState
:members:
:undoc-members:
:inherited-members:

View File

@ -0,0 +1,7 @@
.. contents::
:local:
.. currentmodule:: kombu.transport.virtual.scheduling
.. automodule:: kombu.transport.virtual.scheduling
:members:
:undoc-members:

View File

@ -0,0 +1,13 @@
=====================
kombu.transport.zmq
=====================
.. currentmodule:: kombu.transport.zmq
.. automodule:: kombu.transport.zmq
.. contents::
:local:
:members:
:undoc-members:

View File

@ -0,0 +1,25 @@
===========================
kombu.transport.zookeeper
===========================
.. currentmodule:: kombu.transport.zookeeper
.. automodule:: kombu.transport.zookeeper
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
====================================================
Generic RabbitMQ manager - kombu.utils.amq_manager
====================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.amq_manager
.. automodule:: kombu.utils.amq_manager
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Compat. utilities - kombu.utils.compat
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.compat
.. automodule:: kombu.utils.compat
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Debugging - kombu.utils.debug
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.debug
.. automodule:: kombu.utils.debug
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
String Encoding - kombu.utils.encoding
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.encoding
.. automodule:: kombu.utils.encoding
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Evented I/O - kombu.utils.eventio
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.eventio
.. automodule:: kombu.utils.eventio
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
kombu.utils.functional
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.functional
.. automodule:: kombu.utils.functional
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Rate limiting - kombu.utils.limits
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.limits
.. automodule:: kombu.utils.limits
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Utilities - kombu.utils
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils
.. automodule:: kombu.utils
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Text utilitites - kombu.utils.text
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.text
.. automodule:: kombu.utils.text
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==============================================
kombu.utils.url
==============================================
.. contents::
:local:
.. currentmodule:: kombu.utils.url
.. automodule:: kombu.utils.url
:members:
:undoc-members:

View File

@ -0,0 +1,178 @@
.. _guide-connections:
============================
Connections and transports
============================
.. _connection-basics:
Basics
======
To send and receive messages you need a transport and a connection.
There are several transports to choose from (amqp, librabbitmq, redis, in-memory, etc.),
and you can even create your own. The default transport is amqp.
Create a connection using the default transport::
>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')
The connection will not be established yet, as the connection is established
when needed. If you want to explicitly establish the connection
you have to call the :meth:`~kombu.Connection.connect`
method::
>>> connection.connect()
You can also check whether the connection is connected::
>>> connection.connected
True
Connections must always be closed after use::
>>> connection.close()
But best practice is to release the connection instead,
this will release the resource if the connection is associated
with a connection pool, or close the connection if not,
and makes it easier to do the transition to connection pools later::
>>> connection.release()
.. seealso::
:ref:`guide-pools`
Of course, the connection can be used as a context, and you are
encouraged to do so as it makes it harder to forget releasing open
resources::
with Connection() as connection:
# work with connection
.. _connection-urls:
URLs
====
Connection parameters can be provided as an URL in the format::
transport://userid:password@hostname:port/virtual_host
All of these are valid URLs::
# Specifies using the amqp transport only, default values
# are taken from the keyword arguments.
amqp://
# Using Redis
redis://localhost:6379/
# Using Redis over a Unix socket
redis+socket:///tmp/redis.sock
# Using virtual host '/foo'
amqp://localhost//foo
# Using virtual host 'foo'
amqp://localhost/foo
The query part of the URL can also be used to set options, e.g.::
amqp://localhost/myvhost?ssl=1
See :ref:`connection-options` for a list of supported options.
A connection without options will use the default connection settings,
which is using the localhost host, default port, user name `guest`,
password `guest` and virtual host "/". A connection without arguments
is the same as::
>>> Connection('amqp://guest:guest@localhost:5672//')
The default port is transport specific, for AMQP this is 5672.
Other fields may also have different meaning depending on the transport
used. For example, the Redis transport uses the `virtual_host` argument as
the redis database number.
.. _connection-options:
Keyword arguments
=================
The :class:`~kombu.Connection` class supports additional
keyword arguments, these are:
:hostname: Default host name if not provided in the URL.
:userid: Default user name if not provided in the URL.
:password: Default password if not provided in the URL.
:virtual_host: Default virtual host if not provided in the URL.
:port: Default port if not provided in the URL.
:transport: Default transport if not provided in the URL.
Can be a string specifying the path to the class. (e.g.
``kombu.transport.pyamqp:Transport``), or one of the aliases:
``pyamqp``, ``librabbitmq``, ``redis``, ``memory``, and so on.
:ssl: Use SSL to connect to the server. Default is ``False``.
Only supported by the amqp transport.
:insist: Insist on connecting to a server.
*No longer supported, relic from AMQP 0.8*
:connect_timeout: Timeout in seconds for connecting to the
server. May not be supported by the specified transport.
:transport_options: A dict of additional connection arguments to
pass to alternate kombu channel implementations. Consult the transport
documentation for available options.
AMQP Transports
===============
There are 3 transports available for AMQP use.
1. ``pyamqp`` uses the pure Python library ``amqp``, automatically
installed with Kombu.
2. ``librabbitmq`` uses the high performance transport written in C.
This requires the ``librabbitmq`` Python package to be installed, which
automatically compiles the C library.
3. ``amqp`` tries to use ``librabbitmq`` but falls back to ``pyamqp``.
For the highest performance, you should install the ``librabbitmq`` package.
To ensure librabbitmq is used, you can explicitly specify it in the
transport URL, or use ``amqp`` to have the fallback.
Transport Comparison
====================
+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
must be declared by all clients that needs them.
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.

View File

@ -0,0 +1,103 @@
.. _guide-consumers:
===========
Consumers
===========
.. _consumer-basics:
Basics
======
The :class:`Consumer` takes a connection (or channel) and a list of queues to
consume from. Several consumers can be mixed to consume from different
channels, as they all bind to the same connection, and ``drain_events`` will
drain events from all channels on that connection.
.. note::
Kombu since 3.0 will only accept json/binary or text messages by default,
to allow deserialization of other formats you have to specify them
in the ``accept`` argument::
Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
Draining events from a single consumer:
.. code-block:: python
with Consumer(connection, queues, accept=['json']):
connection.drain_events(timeout=1)
Draining events from several consumers:
.. code-block:: python
from kombu.utils import nested
with connection.channel(), connection.channel() as (channel1, channel2):
with nested(Consumer(channel1, queues1, accept=['json']),
Consumer(channel2, queues2, accept=['json'])):
connection.drain_events(timeout=1)
Or using :class:`~kombu.mixins.ConsumerMixin`:
.. code-block:: python
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [
Consumer(queues, callbacks=[self.on_message], accept=['json']),
]
def on_message(self, body, message):
print("RECEIVED MESSAGE: %r" % (body, ))
message.ack()
C(connection).run()
and with multiple channels again:
.. code-block:: python
from kombu import Consumer
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
channel2 = None
def __init__(self, connection):
self.connection = connection
def get_consumers(self, _, default_channel):
self.channel2 = default_channel.connection.channel()
return [Consumer(default_channel, queues1,
callbacks=[self.on_message],
accept=['json']),
Consumer(self.channel2, queues2,
callbacks=[self.on_special_message],
accept=['json'])]
def on_consumer_end(self, connection, default_channel):
if self.channel2:
self.channel2.close()
C(connection).run()
Reference
=========
.. autoclass:: kombu.Consumer
:noindex:
:members:

View File

@ -0,0 +1,57 @@
.. _examples:
========================
Examples
========================
.. _hello-world-example:
Hello World Example
===================
Below example uses
:ref:`guide-simple`
to send helloworld message through
message broker (rabbitmq) and print received message
:file:`hello_publisher.py`:
.. literalinclude:: ../../examples/hello_publisher.py
:language: python
:file:`hello_consumer.py`:
.. literalinclude:: ../../examples/hello_consumer.py
:language: python
.. _task-queue-example:
Task Queue Example
==================
Very simple task queue using pickle, with primitive support
for priorities using different queues.
:file:`queues.py`:
.. literalinclude:: ../../examples/simple_task_queue/queues.py
:language: python
:file:`worker.py`:
.. literalinclude:: ../../examples/simple_task_queue/worker.py
:language: python
:file:`tasks.py`:
.. literalinclude:: ../../examples/simple_task_queue/tasks.py
:language: python
.. code-block:: python
:file:`client.py`:
.. literalinclude:: ../../examples/simple_task_queue/client.py

18
docs/userguide/index.rst Normal file
View File

@ -0,0 +1,18 @@
============
User Guide
============
:Release: |version|
:Date: |today|
.. toctree::
:maxdepth: 2
introduction
connections
producers
consumers
examples
simple
pools
serialization

View File

@ -0,0 +1,100 @@
.. _guide-intro:
==============
Introduction
==============
.. _intro-messaging:
What is messaging?
==================
In times long ago people didn't have email.
They had the postal service, which with great courage would deliver mail
from hand to hand all over the globe. Soldiers deployed at wars far away could only
communicate with their families through the postal service, and
posting a letter would mean that the recipient wouldn't actually
receive the letter until weeks or months, sometimes years later.
It's hard to imagine this today when people are expected to be available
for phone calls every minute of the day.
So humans need to communicate with each other, this shouldn't
be news to anyone, but why would applications?
One example is banks.
When you transfer money from one bank to another, your bank sends
a message to a central clearinghouse. The clearinghouse
then records and coordinates the transaction. Banks
need to send and receive millions and millions of
messages every day, and losing a single message would mean either losing
your money (bad) or the banks money (very bad)
Another example is the stock exchanges, which also have a need
for very high message throughputs and have strict reliability
requirements.
Email is a great way for people to communicate. It is much faster
than using the postal service, but still using email as a means for
programs to communicate would be like the soldier above, waiting
for signs of life from his girlfriend back home.
.. _messaging-scenarios:
Messaging Scenarios
===================
* Request/Reply
The request/reply pattern works like the postal service example.
A message is addressed to a single recipient, with a return address
printed on the back. The recipient may or may not reply to the
message by sending it back to the original sender.
Request-Reply is achieved using *direct* exchanges.
* Broadcast
In a broadcast scenario a message is sent to all parties.
This could be none, one or many recipients.
Broadcast is achieved using *fanout* exchanges.
* Publish/Subscribe
In a publish/subscribe scenario producers publish messages
to topics, and consumers subscribe to the topics they are
interested in.
If no consumers subscribe to the topic, then the message
will not be delivered to anyone. If several consumers
subscribe to the topic, then the message will be delivered
to all of them.
Pub-sub is achieved using *topic* exchanges.
.. _messaging-reliability:
Reliability
===========
For some applications reliability is very important. Losing a message is
a critical situation that must never happen. For other applications
losing a message is fine, it can maybe recover in other ways,
or the message is resent anyway as periodic updates.
AMQP defines two built-in delivery modes:
* persistent
Messages are written to disk and survives a broker restart.
* transient
Messages may or may not be written to disk, as the broker sees fit
to optimize memory contents. The messages will not survive a broker
restart.
Transient messaging is by far the fastest way to send and receive messages,
so having persistent messages comes with a price, but for some
applications this is a necessary cost.

175
docs/userguide/pools.rst Normal file
View File

@ -0,0 +1,175 @@
.. _guide-pools:
===============================
Connection and Producer Pools
===============================
.. _default-pools:
Default Pools
=============
Kombu ships with two global pools: one connection pool,
and one producer pool.
These are convenient and the fact that they are global
may not be an issue as connections should often be limited
at the process level, rather than per thread/application
and so on, but if you need custom pools per thread
see :ref:`custom-pool-groups`.
.. _default-connections:
The connection pool group
-------------------------
The connection pools are available as :attr:`kombu.pools.connections`.
This is a pool group, which means you give it a connection instance,
and you get a pool instance back. We have one pool per connection
instance to support multiple connections in the same app.
All connection instances with the same connection parameters will
get the same pool::
>>> from kombu import Connection
>>> from kombu.pools import connections
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
Let's acquire and release a connection:
.. code-block:: python
from kombu import Connection
from kombu.pools import connections
connection = Connection('redis://localhost:6379')
with connections[connection].acquire(block=True) as conn:
print('Got connection: %r' % (connection.as_uri(), ))
.. note::
The ``block=True`` here means that the acquire call will block
until a connection is available in the pool.
Note that this will block forever in case there is a deadlock
in your code where a connection is not released. There
is a ``timeout`` argument you can use to safeguard against this
(see :meth:`kombu.connection.Resource.acquire`).
If blocking is disabled and there aren't any connections
left in the pool an :class:`kombu.exceptions.ConnectionLimitExceeded`
exception will be raised.
That's about it. If you need to connect to multiple brokers
at once you can do that too:
.. code-block:: python
from kombu import Connection
from kombu.pools import connections
c1 = Connection('amqp://')
c2 = Connection('redis://')
with connections[c1].acquire(block=True) as conn1:
with connections[c2].acquire(block=True) as conn2:
# ....
.. _default-producers:
The producer pool group
=======================
This is a pool group just like the connections, except
that it manages :class:`~kombu.Producer` instances
used to publish messages.
Here is an example using the producer pool to publish a message
to the ``news`` exchange:
.. code-block:: python
from kombu import Connection, Exchange
from kombu.common import maybe_declare
from kombu.pools import producers
# The exchange we send our news articles to.
news_exchange = Exchange('news')
# The article we want to send
article = {'title': 'No cellular coverage on the tube for 2012',
'ingress': 'yadda yadda yadda'}
# The broker where our exchange is.
connection = Connection('amqp://guest:guest@localhost:5672//')
with producers[connection].acquire(block=True) as producer:
# maybe_declare knows what entities have already been declared
# so we don't have to do so multiple times in the same process.
maybe_declare(news_exchange)
producer.publish(article, routing_key='domestic',
serializer='json',
compression='zlib')
.. _default-pool-limits:
Setting pool limits
-------------------
By default every connection instance has a limit of 200 connections.
You can change this limit using :func:`kombu.pools.set_limit`.
You are able to grow the pool at runtime, but you can't shrink it,
so it is best to set the limit as early as possible after your application
starts::
>>> from kombu import pools
>>> pools.set_limit()
Resetting all pools
-------------------
You can close all active connections and reset all pool groups by
using the :func:`kombu.pools.reset` function. Note that this
will not respect anything currently using these connections,
so will just drag the connections away from under their feet:
you should be very careful before you use this.
Kombu will reset the pools if the process is forked,
so that forked processes start with clean pool groups.
.. _custom-pool-groups:
Custom Pool Groups
==================
To maintain your own pool groups you should create your own
:class:`~kombu.pools.Connections` and :class:`kombu.pools.Producers`
instances:
.. code-block:: python
from kombu import pools
from kombu import Connection
connections = pools.Connection(limit=100)
producers = pools.Producers(limit=connections.limit)
connection = Connection('amqp://guest:guest@localhost:5672//')
with connections[connection].acquire(block=True):
# ...
If you want to use the global limit that can be set with
:func:`~kombu.pools.set_limit` you can use a special value as the ``limit``
argument:
.. code-block:: python
from kombu import pools
connections = pools.Connections(limit=pools.use_default_limit)

View File

@ -0,0 +1,24 @@
.. _guide-producers:
===========
Producers
===========
.. _producer-basics:
Basics
======
Serialization
=============
See :ref:`guide-serialization`.
Reference
=========
.. autoclass:: kombu.Producer
:noindex:
:members:

View File

@ -0,0 +1,184 @@
.. _guide-serialization:
===============
Serialization
===============
.. _serializers:
Serializers
===========
By default every message is encoded using `JSON`_, so sending
Python data structures like dictionaries and lists works.
`YAML`_, `msgpack`_ and Python's built-in `pickle` module is also supported,
and if needed you can register any custom serialization scheme you
want to use.
By default Kombu will only load JSON messages, so if you want
to use other serialization format you must explicitly enable
them in your consumer by using the ``accept`` argument:
.. code-block:: python
Consumer(conn, [queue], accept=['json', 'pickle', 'msgpack'])
The accept argument can also include MIME-types.
.. _`JSON`: http://www.json.org/
.. _`YAML`: http://yaml.org/
.. _`msgpack`: http://msgpack.sourceforge.net/
Each option has its advantages and disadvantages.
`json` -- JSON is supported in many programming languages, is now
a standard part of Python (since 2.6), and is fairly fast to
decode using the modern Python libraries such as `cjson` or
`simplejson`.
The primary disadvantage to `JSON` is that it limits you to
the following data types: strings, Unicode, floats, boolean,
dictionaries, and lists. Decimals and dates are notably missing.
Also, binary data will be transferred using Base64 encoding, which
will cause the transferred data to be around 34% larger than an
encoding which supports native binary types.
However, if your data fits inside the above constraints and
you need cross-language support, the default setting of `JSON`
is probably your best choice.
`pickle` -- If you have no desire to support any language other than
Python, then using the `pickle` encoding will gain you
the support of all built-in Python data types (except class instances),
smaller messages when sending binary files, and a slight speedup
over `JSON` processing.
.. admonition:: Pickle and Security
The pickle format is very convenient as it can serialize
and deserialize almost any object, but this is also a concern
for security.
Carefully crafted pickle payloads can do almost anything
a regular Python program can do, so if you let your consumer
automatically decode pickled objects you must make sure
to limit access to the broker so that untrusted
parties do not have the ability to send messages!
By default Kombu uses pickle protocol 2, but this can be changed
using the :envvar:`PICKLE_PROTOCOL` environment variable or by changing
the global :data:`kombu.serialization.pickle_protocol` flag.
`yaml` -- YAML has many of the same characteristics as `json`,
except that it natively supports more data types (including dates,
recursive references, etc.)
However, the Python libraries for YAML are a good bit slower
than the libraries for JSON.
If you need a more expressive set of data types and need to maintain
cross-language compatibility, then `YAML` may be a better fit
than the above.
To instruct `Kombu` to use an alternate serialization method,
use one of the following options.
1. Set the serialization option on a per-producer basis::
>>> producer = Producer(channel,
... exchange=exchange,
... serializer="yaml")
2. Set the serialization option per message::
>>> producer.publish(message, routing_key=rkey,
... serializer="pickle")
Note that a `Consumer` do not need the serialization method specified.
They can auto-detect the serialization method as the
content-type is sent as a message header.
.. _sending-raw-data:
Sending raw data without Serialization
======================================
In some cases, you don't need your message data to be serialized. If you
pass in a plain string or Unicode object as your message, then `Kombu` will
not waste cycles serializing/deserializing the data.
You can optionally specify a `content_type` and `content_encoding`
for the raw data::
>>> with open("~/my_picture.jpg", "rb") as fh:
... producer.publish(fh.read(),
content_type="image/jpeg",
content_encoding="binary",
routing_key=rkey)
The `Message` object returned by the `Consumer` class will have a
`content_type` and `content_encoding` attribute.
.. _serialization-entrypoints:
Creating extensions using Setuptools entry-points
=================================================
A package can also register new serializers using Setuptools
entry-points.
The entry-point must provide the name of the serializer along
with the path to a tuple providing the rest of the args:
``decoder_function, encoder_function, content_type, content_encoding``.
An example entrypoint could be:
.. code-block:: python
from setuptools import setup
setup(
entry_points={
'kombu.serializers': [
'my_serializer = my_module.serializer:register_args'
]
}
)
Then the module ``my_module.serializer`` would look like:
.. code-block:: python
register_args = (my_decoder, my_encoder, 'application/x-mimetype', 'utf-8')
When this package is installed the new 'my_serializer' serializer will be
supported by Kombu.
.. admonition:: Buffer Objects
The decoder function of custom serializer must support both strings
and Python's old-style buffer objects.
Python pickle and json modules usually don't do this via its ``loads``
function, but you can easily add support by making a wrapper around the
``load`` function that takes file objects instead of strings.
Here's an example wrapping :func:`pickle.loads` in such a way:
.. code-block:: python
import pickle
from kombu.serialization import BytesIO, register
def loads(s):
return pickle.load(BytesIO(s))
register('my_pickle', pickle.dumps, loads,
content_type='application/x-pickle2',
content_encoding='binary')

116
docs/userguide/simple.rst Normal file
View File

@ -0,0 +1,116 @@
.. _guide-simple:
==================
Simple Interface
==================
.. contents::
:local:
:mod:`kombu.simple` is a simple interface to AMQP queueing.
It is only slightly different from the :class:`~Queue.Queue` class in the
Python Standard Library, which makes it excellent for users with basic
messaging needs.
Instead of defining exchanges and queues, the simple classes only requires
two arguments, a connection channel and a name. The name is used as the
queue, exchange and routing key. If the need arises, you can specify
a :class:`~kombu.Queue` as the name argument instead.
In addition, the :class:`~kombu.Connection` comes with
shortcuts to create simple queues using the current connection:
.. code-block:: python
>>> queue = connection.SimpleQueue('myqueue')
>>> # ... do something with queue
>>> queue.close()
This is equivalent to:
.. code-block:: python
>>> from kombu import SimpleQueue, SimpleBuffer
>>> channel = connection.channel()
>>> queue = SimpleBuffer(channel)
>>> # ... do something with queue
>>> channel.close()
>>> queue.close()
.. _simple-send-receive:
Sending and receiving messages
==============================
The simple interface defines two classes; :class:`~kombu.simple.SimpleQueue`,
and :class:`~kombu.simple.SimpleBuffer`. The former is used for persistent
messages, and the latter is used for transient, buffer-like queues.
They both have the same interface, so you can use them interchangeably.
Here is an example using the :class:`~kombu.simple.SimpleQueue` class
to produce and consume logging messages:
.. code-block:: python
import socket
import datetime
from time import time
from kombu import Connection
class Logger(object):
def __init__(self, connection, queue_name='log_queue',
serializer='json', compression=None):
self.queue = connection.SimpleQueue(queue_name)
self.serializer = serializer
self.compression = compression
def log(self, message, level='INFO', context={}):
self.queue.put({'message': message,
'level': level,
'context': context,
'hostname': socket.gethostname(),
'timestamp': time()},
serializer=self.serializer,
compression=self.compression)
def process(self, callback, n=1, timeout=1):
for i in xrange(n):
log_message = self.queue.get(block=True, timeout=1)
entry = log_message.payload # deserialized data.
callback(entry)
log_message.ack() # remove message from queue
def close(self):
self.queue.close()
if __name__ == '__main__':
from contextlib import closing
with Connection('amqp://guest:guest@localhost:5672//') as conn:
with closing(Logger(conn)) as logger:
# Send message
logger.log('Error happened while encoding video',
level='ERROR',
context={'filename': 'cutekitten.mpg'})
# Consume and process message
# This is the callback called when a log message is
# received.
def dump_entry(entry):
date = datetime.datetime.fromtimestamp(entry['timestamp'])
print('[%s %s %s] %s %r' % (date,
entry['hostname'],
entry['level'],
entry['message'],
entry['context']))
# Process a single message using the callback above.
logger.process(dump_entry, n=1)

View File

@ -0,0 +1,41 @@
"""
Example of simple consumer that waits for a single message, acknowledges it
and exits.
"""
from kombu import Connection, Exchange, Queue, Consumer, eventloop
from pprint import pformat
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
exchange = Exchange('kombu_demo', type='direct')
queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
def pretty(obj):
return pformat(obj, indent=4)
#: This is the callback applied when a message is received.
def handle_message(body, message):
print('Received message: %r' % (body, ))
print(' properties:\n%s' % (pretty(message.properties), ))
print(' delivery_info:\n%s' % (pretty(message.delivery_info), ))
message.ack()
#: Create a connection and a channel.
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
with Connection('amqp://guest:guest@localhost:5672//') as connection:
#: Create consumer using our callback and queue.
#: Second argument can also be a list to consume from
#: any number of queues.
with Consumer(connection, queue, callbacks=[handle_message]):
#: Each iteration waits for a single event. Note that this
#: event may not be a message, or a message that is to be
#: delivered to the consumers channel, but any event received
#: on the connection.
for _ in eventloop(connection):
pass

30
examples/complete_send.py Normal file
View File

@ -0,0 +1,30 @@
"""
Example producer that sends a single message and exits.
You can use `complete_receive.py` to receive the message sent.
"""
from kombu import Connection, Producer, Exchange, Queue
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
exchange = Exchange('kombu_demo', type='direct')
queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
with Connection('amqp://guest:guest@localhost:5672//') as connection:
#: Producers are used to publish messages.
#: a default exchange and routing key can also be specifed
#: as arguments the Producer, but we rather specify this explicitly
#: at the publish call.
producer = Producer(connection)
#: Publish the message using the json serializer (which is the default),
#: and zlib compression. The kombu consumer will automatically detect
#: encoding, serialization and compression used and decode accordingly.
producer.publish({'hello': 'world'},
exchange=exchange,
routing_key='kombu_demo',
serializer='json', compression='zlib')

View File

@ -0,0 +1,29 @@
#!/usr/bin/env python
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.async import Hub
from threading import Event
hub = Hub()
exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('MESSAGE SENT')
def on_message(message):
print('RECEIVED: %r' % (message.body, ))
message.ack()
hub.stop() # <-- exit after one message
if __name__ == '__main__':
conn = Connection('amqp://')
conn.register_with_event_loop(hub)
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.run_forever()

View File

@ -0,0 +1,8 @@
from kombu import Connection
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = simple_queue.get(block=True, timeout=1)
print("Received: %s" % message.payload)
message.ack()
simple_queue.close()

View File

@ -0,0 +1,9 @@
from kombu import Connection
import datetime
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = 'helloword, sent at %s' % datetime.datetime.today()
simple_queue.put(message)
print('Sent: %s' % message)
simple_queue.close()

View File

@ -0,0 +1,39 @@
"""
Example that sends a single message and exits using the simple interface.
You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
import eventlet
from kombu import Connection
eventlet.monkey_patch()
def wait_many(timeout=1):
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
with Connection('amqp://guest:guest@localhost:5672//') as connection:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the
#: queue name, exchange name and routing key.
with connection.SimpleQueue('kombu_demo') as queue:
while True:
try:
message = queue.get(block=False, timeout=timeout)
except queue.Empty:
break
else:
message.ack()
print(message.payload)
eventlet.spawn(wait_many).wait()

View File

@ -0,0 +1,40 @@
"""
Example that sends a single message and exits using the simple interface.
You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
import eventlet
from kombu import Connection
eventlet.monkey_patch()
def send_many(n):
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
with Connection('amqp://guest:guest@localhost:5672//') as connection:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the
#: queue name, exchange name and routing key.
with connection.SimpleQueue('kombu_demo') as queue:
def send_message(i):
queue.put({'hello': 'world%s' % (i, )})
pool = eventlet.GreenPool(10)
for i in range(n):
pool.spawn(send_message, i)
pool.waitall()
if __name__ == '__main__':
send_many(10)

View File

@ -0,0 +1,26 @@
"""
Example receiving a message using the SimpleQueue interface.
"""
from kombu import Connection
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
with Connection('amqp://guest:guest@localhost:5672//') as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
with conn.SimpleQueue('kombu_demo') as queue:
message = queue.get(block=True, timeout=10)
message.ack()
print(message.payload)
####
#: If you don't use the with statement then you must aways
# remember to close objects after use:
# queue.close()
# connection.close()

Some files were not shown because too many files have changed in this diff Show More