456 lines
14 KiB
Python
456 lines
14 KiB
Python
"""
|
|
kombu.serialization
|
|
===================
|
|
|
|
Serialization utilities.
|
|
|
|
"""
|
|
from __future__ import absolute_import
|
|
|
|
import codecs
|
|
import os
|
|
import sys
|
|
|
|
import pickle as pypickle
|
|
try:
|
|
import cPickle as cpickle
|
|
except ImportError: # pragma: no cover
|
|
cpickle = None # noqa
|
|
|
|
from collections import namedtuple
|
|
from contextlib import contextmanager
|
|
|
|
from .exceptions import (
|
|
ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
|
|
)
|
|
from .five import BytesIO, reraise, text_t
|
|
from .utils import entrypoints
|
|
from .utils.encoding import str_to_bytes, bytes_t
|
|
|
|
__all__ = ['pickle', 'loads', 'dumps', 'register', 'unregister']
|
|
SKIP_DECODE = frozenset(['binary', 'ascii-8bit'])
|
|
TRUSTED_CONTENT = frozenset(['application/data', 'application/text'])
|
|
|
|
if sys.platform.startswith('java'): # pragma: no cover
|
|
|
|
def _decode(t, coding):
|
|
return codecs.getdecoder(coding)(t)[0]
|
|
else:
|
|
_decode = codecs.decode
|
|
|
|
pickle = cpickle or pypickle
|
|
pickle_load = pickle.load
|
|
|
|
#: Kombu requires Python 2.5 or later so we use protocol 2 by default.
|
|
#: There's a new protocol (3) but this is only supported by Python 3.
|
|
pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2))
|
|
|
|
codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))
|
|
|
|
|
|
@contextmanager
|
|
def _reraise_errors(wrapper,
|
|
include=(Exception, ), exclude=(SerializerNotInstalled, )):
|
|
try:
|
|
yield
|
|
except exclude:
|
|
raise
|
|
except include as exc:
|
|
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
|
|
|
|
|
|
def pickle_loads(s, load=pickle_load):
|
|
# used to support buffer objects
|
|
return load(BytesIO(s))
|
|
|
|
|
|
def parenthesize_alias(first, second):
|
|
return '%s (%s)' % (first, second) if first else second
|
|
|
|
|
|
class SerializerRegistry(object):
|
|
"""The registry keeps track of serialization methods."""
|
|
|
|
def __init__(self):
|
|
self._encoders = {}
|
|
self._decoders = {}
|
|
self._default_encode = None
|
|
self._default_content_type = None
|
|
self._default_content_encoding = None
|
|
self._disabled_content_types = set()
|
|
self.type_to_name = {}
|
|
self.name_to_type = {}
|
|
|
|
def register(self, name, encoder, decoder, content_type,
|
|
content_encoding='utf-8'):
|
|
if encoder:
|
|
self._encoders[name] = codec(
|
|
content_type, content_encoding, encoder,
|
|
)
|
|
if decoder:
|
|
self._decoders[content_type] = decoder
|
|
self.type_to_name[content_type] = name
|
|
self.name_to_type[name] = content_type
|
|
|
|
def enable(self, name):
|
|
if '/' not in name:
|
|
name = self.name_to_type[name]
|
|
self._disabled_content_types.discard(name)
|
|
|
|
def disable(self, name):
|
|
if '/' not in name:
|
|
name = self.name_to_type[name]
|
|
self._disabled_content_types.add(name)
|
|
|
|
def unregister(self, name):
|
|
try:
|
|
content_type = self.name_to_type[name]
|
|
self._decoders.pop(content_type, None)
|
|
self._encoders.pop(name, None)
|
|
self.type_to_name.pop(content_type, None)
|
|
self.name_to_type.pop(name, None)
|
|
except KeyError:
|
|
raise SerializerNotInstalled(
|
|
'No encoder/decoder installed for {0}'.format(name))
|
|
|
|
def _set_default_serializer(self, name):
|
|
"""
|
|
Set the default serialization method used by this library.
|
|
|
|
:param name: The name of the registered serialization method.
|
|
For example, `json` (default), `pickle`, `yaml`, `msgpack`,
|
|
or any custom methods registered using :meth:`register`.
|
|
|
|
:raises SerializerNotInstalled: If the serialization method
|
|
requested is not available.
|
|
"""
|
|
try:
|
|
(self._default_content_type, self._default_content_encoding,
|
|
self._default_encode) = self._encoders[name]
|
|
except KeyError:
|
|
raise SerializerNotInstalled(
|
|
'No encoder installed for {0}'.format(name))
|
|
|
|
def dumps(self, data, serializer=None):
|
|
if serializer == 'raw':
|
|
return raw_encode(data)
|
|
if serializer and not self._encoders.get(serializer):
|
|
raise SerializerNotInstalled(
|
|
'No encoder installed for {0}'.format(serializer))
|
|
|
|
# If a raw string was sent, assume binary encoding
|
|
# (it's likely either ASCII or a raw binary file, and a character
|
|
# set of 'binary' will encompass both, even if not ideal.
|
|
if not serializer and isinstance(data, bytes_t):
|
|
# In Python 3+, this would be "bytes"; allow binary data to be
|
|
# sent as a message without getting encoder errors
|
|
return 'application/data', 'binary', data
|
|
|
|
# For Unicode objects, force it into a string
|
|
if not serializer and isinstance(data, text_t):
|
|
with _reraise_errors(EncodeError, exclude=()):
|
|
payload = data.encode('utf-8')
|
|
return 'text/plain', 'utf-8', payload
|
|
|
|
if serializer:
|
|
content_type, content_encoding, encoder = \
|
|
self._encoders[serializer]
|
|
else:
|
|
encoder = self._default_encode
|
|
content_type = self._default_content_type
|
|
content_encoding = self._default_content_encoding
|
|
|
|
with _reraise_errors(EncodeError):
|
|
payload = encoder(data)
|
|
return content_type, content_encoding, payload
|
|
encode = dumps # XXX compat
|
|
|
|
def loads(self, data, content_type, content_encoding,
|
|
accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
|
|
content_type = content_type or 'application/data'
|
|
if accept is not None:
|
|
if content_type not in _trusted_content \
|
|
and content_type not in accept:
|
|
raise self._for_untrusted_content(content_type, 'untrusted')
|
|
else:
|
|
if content_type in self._disabled_content_types and not force:
|
|
raise self._for_untrusted_content(content_type, 'disabled')
|
|
content_encoding = (content_encoding or 'utf-8').lower()
|
|
|
|
if data:
|
|
decode = self._decoders.get(content_type)
|
|
if decode:
|
|
with _reraise_errors(DecodeError):
|
|
return decode(data)
|
|
if content_encoding not in SKIP_DECODE and \
|
|
not isinstance(data, text_t):
|
|
with _reraise_errors(DecodeError):
|
|
return _decode(data, content_encoding)
|
|
return data
|
|
decode = loads # XXX compat
|
|
|
|
def _for_untrusted_content(self, ctype, why):
|
|
return ContentDisallowed(
|
|
'Refusing to deserialize {0} content of type {1}'.format(
|
|
why,
|
|
parenthesize_alias(self.type_to_name.get(ctype, ctype), ctype),
|
|
),
|
|
)
|
|
|
|
|
|
#: Global registry of serializers/deserializers.
|
|
registry = SerializerRegistry()
|
|
|
|
|
|
"""
|
|
.. function:: dumps(data, serializer=default_serializer)
|
|
|
|
Serialize a data structure into a string suitable for sending
|
|
as an AMQP message body.
|
|
|
|
:param data: The message data to send. Can be a list,
|
|
dictionary or a string.
|
|
|
|
:keyword serializer: An optional string representing
|
|
the serialization method you want the data marshalled
|
|
into. (For example, `json`, `raw`, or `pickle`).
|
|
|
|
If :const:`None` (default), then json will be used, unless
|
|
`data` is a :class:`str` or :class:`unicode` object. In this
|
|
latter case, no serialization occurs as it would be
|
|
unnecessary.
|
|
|
|
Note that if `serializer` is specified, then that
|
|
serialization method will be used even if a :class:`str`
|
|
or :class:`unicode` object is passed in.
|
|
|
|
:returns: A three-item tuple containing the content type
|
|
(e.g., `application/json`), content encoding, (e.g.,
|
|
`utf-8`) and a string containing the serialized
|
|
data.
|
|
|
|
:raises SerializerNotInstalled: If the serialization method
|
|
requested is not available.
|
|
"""
|
|
dumps = encode = registry.encode # XXX encode is a compat alias
|
|
|
|
"""
|
|
.. function:: loads(data, content_type, content_encoding):
|
|
|
|
Deserialize a data stream as serialized using `dumps`
|
|
based on `content_type`.
|
|
|
|
:param data: The message data to deserialize.
|
|
|
|
:param content_type: The content-type of the data.
|
|
(e.g., `application/json`).
|
|
|
|
:param content_encoding: The content-encoding of the data.
|
|
(e.g., `utf-8`, `binary`, or `us-ascii`).
|
|
|
|
:returns: The unserialized data.
|
|
|
|
"""
|
|
loads = decode = registry.decode # XXX decode is a compat alias
|
|
|
|
|
|
"""
|
|
.. function:: register(name, encoder, decoder, content_type,
|
|
content_encoding='utf-8'):
|
|
Register a new encoder/decoder.
|
|
|
|
:param name: A convenience name for the serialization method.
|
|
|
|
:param encoder: A method that will be passed a python data structure
|
|
and should return a string representing the serialized data.
|
|
If :const:`None`, then only a decoder will be registered. Encoding
|
|
will not be possible.
|
|
|
|
:param decoder: A method that will be passed a string representing
|
|
serialized data and should return a python data structure.
|
|
If :const:`None`, then only an encoder will be registered.
|
|
Decoding will not be possible.
|
|
|
|
:param content_type: The mime-type describing the serialized
|
|
structure.
|
|
|
|
:param content_encoding: The content encoding (character set) that
|
|
the `decoder` method will be returning. Will usually be
|
|
`utf-8`, `us-ascii`, or `binary`.
|
|
|
|
"""
|
|
register = registry.register
|
|
|
|
|
|
"""
|
|
.. function:: unregister(name):
|
|
Unregister registered encoder/decoder.
|
|
|
|
:param name: Registered serialization method name.
|
|
|
|
"""
|
|
unregister = registry.unregister
|
|
|
|
|
|
def raw_encode(data):
|
|
"""Special case serializer."""
|
|
content_type = 'application/data'
|
|
payload = data
|
|
if isinstance(payload, text_t):
|
|
content_encoding = 'utf-8'
|
|
with _reraise_errors(EncodeError, exclude=()):
|
|
payload = payload.encode(content_encoding)
|
|
else:
|
|
content_encoding = 'binary'
|
|
return content_type, content_encoding, payload
|
|
|
|
|
|
def register_json():
|
|
"""Register a encoder/decoder for JSON serialization."""
|
|
from anyjson import loads as json_loads, dumps as json_dumps
|
|
|
|
def _loads(obj):
|
|
if isinstance(obj, bytes_t):
|
|
obj = obj.decode()
|
|
return json_loads(obj)
|
|
|
|
registry.register('json', json_dumps, _loads,
|
|
content_type='application/json',
|
|
content_encoding='utf-8')
|
|
|
|
|
|
def register_yaml():
|
|
"""Register a encoder/decoder for YAML serialization.
|
|
|
|
It is slower than JSON, but allows for more data types
|
|
to be serialized. Useful if you need to send data such as dates"""
|
|
try:
|
|
import yaml
|
|
registry.register('yaml', yaml.safe_dump, yaml.safe_load,
|
|
content_type='application/x-yaml',
|
|
content_encoding='utf-8')
|
|
except ImportError:
|
|
|
|
def not_available(*args, **kwargs):
|
|
"""In case a client receives a yaml message, but yaml
|
|
isn't installed."""
|
|
raise SerializerNotInstalled(
|
|
'No decoder installed for YAML. Install the PyYAML library')
|
|
registry.register('yaml', None, not_available, 'application/x-yaml')
|
|
|
|
|
|
if sys.version_info[0] == 3: # pragma: no cover
|
|
|
|
def unpickle(s):
|
|
return pickle_loads(str_to_bytes(s))
|
|
|
|
else:
|
|
unpickle = pickle_loads # noqa
|
|
|
|
|
|
def register_pickle():
|
|
"""The fastest serialization method, but restricts
|
|
you to python clients."""
|
|
|
|
def pickle_dumps(obj, dumper=pickle.dumps):
|
|
return dumper(obj, protocol=pickle_protocol)
|
|
|
|
registry.register('pickle', pickle_dumps, unpickle,
|
|
content_type='application/x-python-serialize',
|
|
content_encoding='binary')
|
|
|
|
|
|
def register_msgpack():
|
|
"""See http://msgpack.sourceforge.net/"""
|
|
try:
|
|
try:
|
|
from msgpack import packb as pack, unpackb
|
|
unpack = lambda s: unpackb(s, encoding='utf-8')
|
|
except ImportError:
|
|
# msgpack < 0.2.0 and Python 2.5
|
|
from msgpack import packs as pack, unpacks as unpack # noqa
|
|
registry.register(
|
|
'msgpack', pack, unpack,
|
|
content_type='application/x-msgpack',
|
|
content_encoding='binary')
|
|
except (ImportError, ValueError):
|
|
|
|
def not_available(*args, **kwargs):
|
|
"""In case a client receives a msgpack message, but yaml
|
|
isn't installed."""
|
|
raise SerializerNotInstalled(
|
|
'No decoder installed for msgpack. '
|
|
'Please install the msgpack library')
|
|
registry.register('msgpack', None, not_available,
|
|
'application/x-msgpack')
|
|
|
|
# Register the base serialization methods.
|
|
register_json()
|
|
register_pickle()
|
|
register_yaml()
|
|
register_msgpack()
|
|
|
|
# Default serializer is 'json'
|
|
registry._set_default_serializer('json')
|
|
|
|
|
|
_setupfuns = {
|
|
'json': register_json,
|
|
'pickle': register_pickle,
|
|
'yaml': register_yaml,
|
|
'msgpack': register_msgpack,
|
|
'application/json': register_json,
|
|
'application/x-yaml': register_yaml,
|
|
'application/x-python-serialize': register_pickle,
|
|
'application/x-msgpack': register_msgpack,
|
|
}
|
|
|
|
|
|
def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']):
|
|
"""Enable serializers that are considered to be unsafe.
|
|
|
|
Will enable ``pickle``, ``yaml`` and ``msgpack`` by default,
|
|
but you can also specify a list of serializers (by name or content type)
|
|
to enable.
|
|
|
|
"""
|
|
for choice in choices:
|
|
try:
|
|
registry.enable(choice)
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
def disable_insecure_serializers(allowed=['json']):
|
|
"""Disable untrusted serializers.
|
|
|
|
Will disable all serializers except ``json``
|
|
or you can specify a list of deserializers to allow.
|
|
|
|
.. note::
|
|
|
|
Producers will still be able to serialize data
|
|
in these formats, but consumers will not accept
|
|
incoming data using the untrusted content types.
|
|
|
|
"""
|
|
for name in registry._decoders:
|
|
registry.disable(name)
|
|
if allowed is not None:
|
|
for name in allowed:
|
|
registry.enable(name)
|
|
|
|
|
|
# Insecure serializers are disabled by default since v3.0
|
|
disable_insecure_serializers()
|
|
|
|
# Load entrypoints from installed extensions
|
|
for ep, args in entrypoints('kombu.serializers'): # pragma: no cover
|
|
register(ep.name, *args)
|
|
|
|
|
|
def prepare_accept_content(l, name_to_type=registry.name_to_type):
|
|
if l is not None:
|
|
return set(n if '/' in n else name_to_type[n] for n in l)
|
|
return l
|