Import python-requests-toolbelt_0.8.0.orig.tar.gz

[dgit import orig python-requests-toolbelt_0.8.0.orig.tar.gz]
This commit is contained in:
Petter Reinholdtsen 2017-07-16 11:19:44 +02:00
commit 64e3139528
100 changed files with 9400 additions and 0 deletions

43
AUTHORS.rst Normal file
View File

@ -0,0 +1,43 @@
Requests-toolbelt is written and maintained by Ian Cordasco, Cory Benfield and
various contributors:
Development Lead
````````````````
- Ian Cordasco
- Cory Benfield
Requests
````````
- Kenneth Reitz <me@kennethreitz.com> and various contributors
Urllib3
```````
- Andrey Petrov <andrey.petrov@shazow.net>
Patches and Suggestions
```````````````````````
- Jay De Lanoy <jay@delanoy.co>
- Zhaoyu Luo <luozhaoyu90@gmail.com>
- Markus Unterwaditzer <markus@unterwaditzer.net>
- Bryce Boe <bbzbryce@gmail.com> (@bboe)
- Dan Lipsitt (https://github.com/DanLipsitt)
- Cea Stapleton (http://www.ceastapleton.com)
- Patrick Creech <pcreech@redhat.com>
- Mike Lambert (@mikelambert)
- Ryan Barrett (https://snarfed.org/)

54
CODE_OF_CONDUCT.rst Normal file
View File

@ -0,0 +1,54 @@
Contributor Code of Conduct
---------------------------
As contributors and maintainers of this project, and in the interest of
fostering an open and welcoming community, we pledge to respect all
people who contribute through reporting issues, posting feature
requests, updating documentation, submitting pull requests or patches,
and other activities.
We are committed to making participation in this project a
harassment-free experience for everyone, regardless of level of
experience, gender, gender identity and expression, sexual orientation,
disability, personal appearance, body size, race, ethnicity, age,
religion, or nationality.
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery
* Personal attacks
* Trolling or insulting/derogatory comments
* Public or private harassment
* Publishing other's private information, such as physical or electronic
addresses, without explicit permission
* Other unethical or unprofessional conduct
Project maintainers have the right and responsibility to remove, edit,
or reject comments, commits, code, wiki edits, issues, and other
contributions that are not aligned to this Code of Conduct, or to ban
temporarily or permanently any contributor for other behaviors that they
deem inappropriate, threatening, offensive, or harmful.
By adopting this Code of Conduct, project maintainers commit themselves
to fairly and consistently applying these principles to every aspect of
managing this project. Project maintainers who do not follow or enforce
the Code of Conduct may be permanently removed from the project team.
This code of conduct applies both within project spaces and in public
spaces when an individual is representing the project or its community.
Instances of abusive, harassing, or otherwise unacceptable behavior may
be reported by contacting a project maintainer at graffatcolmingov@gmail.com.
All complaints will be reviewed and investigated and will
result in a response that is deemed necessary and appropriate to the
circumstances. Maintainers are obligated to maintain confidentiality
with regard to the reporter of an incident.
This Code of Conduct is adapted from the `Contributor Covenant`_, version
1.3.0, available at http://contributor-covenant.org/version/1/3/0/
.. _Contributor Covenant: http://contributor-covenant.org
..
Re-formatted to reStructuredText from
https://raw.githubusercontent.com/CoralineAda/contributor_covenant/master/CODE_OF_CONDUCT.md

297
HISTORY.rst Normal file
View File

@ -0,0 +1,297 @@
History
=======
0.8.0 -- 2017-05-20
-------------------
More information about this release can be found on the `0.8.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``UserAgentBuilder`` to provide more control over generated User-Agent
strings.
Fixed Bugs
~~~~~~~~~~
- Include ``_validate_certificate`` in the lits of picked attributes on the
``AppEngineAdapter``.
- Fix backwards incompatibility in ``get_encodings_from_content``
.. _0.8.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.8.0
0.7.1 -- 2017-02-13
-------------------
More information about this release can be found on the `0.7.1 milestone`_.
Fixed Bugs
~~~~~~~~~~
- Fixed monkey-patching for the AppEngineAdapter.
- Make it easier to disable certificate verification when monkey-patching
AppEngine.
- Handle ``multipart/form-data`` bodies without a trailing ``CRLF``.
.. links
.. _0.7.1 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestone/9
0.7.0 -- 2016-07-21
-------------------
More information about this release can be found on the `0.7.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``BaseUrlSession`` to allow developers to have a session that has a
"Base" URL. See the documentation for more details and examples.
- Split the logic of ``stream_response_to_file`` into two separate functions:
* ``get_download_file_path`` to generate the file name from the Response.
* ``stream_response_to_file`` which will use ``get_download_file_path`` if
necessary
Fixed Bugs
~~~~~~~~~~
- Fixed the issue for people using *very* old versions of Requests where they
would see an ImportError from ``requests_toolbelt._compat`` when trying to
import ``connection``.
.. _0.7.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.7.0
0.6.2 -- 2016-05-10
-------------------
Fixed Bugs
~~~~~~~~~~
- When passing a timeout via Requests, it was not appropriately translated to
the timeout that the urllib3 code was expecting.
0.6.1 -- 2016-05-05
-------------------
Fixed Bugs
~~~~~~~~~~
- Remove assertion about request URLs in the AppEngineAdapter.
- Prevent pip from installing requests 3.0.0 when that is released until we
are ready to handle it.
0.6.0 -- 2016-01-27
-------------------
More information about this release can be found on the `0.6.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``AppEngineAdapter`` to support developers using Google's AppEngine
platform with Requests.
- Add ``GuessProxyAuth`` class to support guessing between Basic and Digest
Authentication for proxies.
Fixed Bugs
~~~~~~~~~~
- Ensure that proxies use the correct TLS version when using the
``SSLAdapter``.
- Fix an ``AttributeError`` when using the ``HTTPProxyDigestAuth`` class.
Miscellaneous
~~~~~~~~~~~~~
- Drop testing support for Python 3.2. virtualenv and pip have stopped
supporting it meaning that it is harder to test for this with our CI
infrastructure. Moving forward we will make a best-effort attempt to
support 3.2 but will not test for it.
.. _0.6.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.6.0
0.5.1 -- 2015-12-16
-------------------
More information about this release can be found on the `0.5.1 milestone`_.
Fixed Bugs
~~~~~~~~~~
- Now papers over the differences in requests' ``super_len`` function from
versions prior to 2.9.0 and versions 2.9.0 and later.
.. _0.5.1 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.5.1
0.5.0 -- 2015-11-24
-------------------
More information about this release can be found on the `milestone
<https://github.com/sigmavirus24/requests-toolbelt/issues?utf8=%E2%9C%93&q=is%3Aall+milestone%3A0.5+>`_
for 0.5.0.
New Features
~~~~~~~~~~~~
- The ``tee`` submodule was added to ``requests_toolbelt.downloadutils``. It
allows you to iterate over the bytes of a response while also writing them
to a file. The ``tee.tee`` function, expects you to pass an open file
object, while ``tee.tee_to_file`` will use the provided file name to open
the file for you.
- Added a new parameter to ``requests_toolbelt.utils.user_agent`` that allows
the user to specify additional items.
- Added nested form-data helper,
``requests_toolbelt.utils.formdata.urlencode``.
- Added the ``ForgetfulCookieJar`` to ``requests_toolbelt.cookies``.
- Added utilities for dumping the information about a request-response cycle
in ``requests_toolbelt.utils.dump``.
- Implemented the API described in the ``requests_toolbelt.threaded`` module
docstring, i.e., added ``requests_toolbelt.threaded.map`` as an available
function.
Fixed Bugs
~~~~~~~~~~
- Now papers over the API differences in versions of requests installed from
system packages versus versions of requests installed from PyPI.
- Allow string types for ``SourceAddressAdapter``.
0.4.0 -- 2015-04-03
-------------------
For more information about this release, please see `milestone 0.4.0
<https://github.com/sigmavirus24/requests-toolbelt/issues?q=milestone%3A0.4>`_
on the project's page.
New Features
~~~~~~~~~~~~
- A naive implemenation of a thread pool is now included in the toolbelt. See
the docs in ``docs/threading.rst`` or on `Read The Docs
<https://toolbelt.readthedocs.org>`_.
- The ``StreamingIterator`` now accepts files (such as ``sys.stdin``) without
a specific length and will properly stream them.
- The ``MultipartEncoder`` now accepts exactly the same format of fields as
requests' ``files`` parameter does. In other words, you can now also pass in
extra headers to add to a part in the body. You can also now specify a
custom ``Content-Type`` for a part.
- An implementation of HTTP Digest Authentication for Proxies is now included.
- A transport adapter that allows a user to specify a specific Certificate
Fingerprint is now included in the toolbelt.
- A transport adapter that simplifies how users specify socket options is now
included.
- A transport adapter that simplifies how users can specify TCP Keep-Alive
options is now included in the toolbelt.
- Deprecated functions from ``requests.utils`` are now included and
maintained.
- An authentication tool that allows users to specify how to authenticate to
several different domains at once is now included.
- A function to save streamed responses to disk by analyzing the
``Content-Disposition`` header is now included in the toolbelt.
Fixed Bugs
~~~~~~~~~~
- The ``MultipartEncoder`` will now allow users to upload files larger than
4GB on 32-bit systems.
- The ``MultipartEncoder`` will now accept empty unicode strings for form
values.
0.3.1 -- 2014-06-23
-------------------
- Fix the fact that 0.3.0 bundle did not include the ``StreamingIterator``
0.3.0 -- 2014-05-21
-------------------
Bug Fixes
~~~~~~~~~
- Complete rewrite of ``MultipartEncoder`` fixes bug where bytes were lost in
uploads
New Features
~~~~~~~~~~~~
- ``MultipartDecoder`` to accept ``multipart/form-data`` response bodies and
parse them into an easy to use object.
- ``SourceAddressAdapter`` to allow users to choose a local address to bind
connections to.
- ``GuessAuth`` which accepts a username and password and uses the
``WWW-Authenticate`` header to determine how to authenticate against a
server.
- ``MultipartEncoderMonitor`` wraps an instance of the ``MultipartEncoder``
and keeps track of how many bytes were read and will call the provided
callback.
- ``StreamingIterator`` will wrap an iterator and stream the upload instead of
chunk it, provided you also provide the length of the content you wish to
upload.
0.2.0 -- 2014-02-24
-------------------
- Add ability to tell ``MultipartEncoder`` which encoding to use. By default
it uses 'utf-8'.
- Fix #10 - allow users to install with pip
- Fix #9 - Fix ``MultipartEncoder#to_string`` so that it properly handles file
objects as fields
0.1.2 -- 2014-01-19
-------------------
- At some point during development we broke how we handle normal file objects.
Thanks to @konomae this is now fixed.
0.1.1 -- 2014-01-19
-------------------
- Handle ``io.BytesIO``-like objects better
0.1.0 -- 2014-01-18
-------------------
- Add initial implementation of the streaming ``MultipartEncoder``
- Add initial implementation of the ``user_agent`` function
- Add the ``SSLAdapter``

13
LICENSE Normal file
View File

@ -0,0 +1,13 @@
Copyright 2014 Ian Cordasco, Cory Benfield
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

14
MANIFEST.in Normal file
View File

@ -0,0 +1,14 @@
include README.rst
include LICENSE
include HISTORY.rst
include AUTHORS.rst
include CODE_OF_CONDUCT.rst
include tox.ini
include dev-requirements.txt
recursive-include requests_toolbelt *
recursive-include docs *
recursive-include tests *
prune docs/_build
global-exclude *.py[cdo] __pycache__ *.so *.pyd

434
PKG-INFO Normal file
View File

@ -0,0 +1,434 @@
Metadata-Version: 1.1
Name: requests-toolbelt
Version: 0.8.0
Summary: A utility belt for advanced users of python-requests
Home-page: https://toolbelt.readthedocs.org
Author: Ian Cordasco, Cory Benfield
Author-email: graffatcolmingov@gmail.com
License: Apache 2.0
Description: requests toolbelt
=================
This is just a collection of utilities for `python-requests`_, but don't
really belong in ``requests`` proper. The minimum tested requests version is
``2.1.0``. In reality, the toolbelt should work with ``2.0.1`` as well, but
some idiosyncracies prevent effective or sane testing on that version.
``pip install requests-toolbelt`` to get started!
multipart/form-data Encoder
---------------------------
The main attraction is a streaming multipart form-data object, ``MultipartEncoder``.
Its API looks like this:
.. code-block:: python
from requests_toolbelt import MultipartEncoder
import requests
m = MultipartEncoder(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')}
)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
You can also use ``multipart/form-data`` encoding for requests that don't
require files:
.. code-block:: python
from requests_toolbelt import MultipartEncoder
import requests
m = MultipartEncoder(fields={'field0': 'value', 'field1': 'value'})
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
Or, you can just create the string and examine the data:
.. code-block:: python
# Assuming `m` is one of the above
m.to_string() # Always returns unicode
User-Agent constructor
----------------------
You can easily construct a requests-style ``User-Agent`` string::
from requests_toolbelt import user_agent
headers = {
'User-Agent': user_agent('my_package', '0.0.1')
}
r = requests.get('https://api.github.com/users', headers=headers)
SSLAdapter
----------
The ``SSLAdapter`` was originally published on `Cory Benfield's blog`_.
This adapter allows the user to choose one of the SSL protocols made available
in Python's ``ssl`` module for outgoing HTTPS connections:
.. code-block:: python
from requests_toolbelt import SSLAdapter
import requests
import ssl
s = requests.Session()
s.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1))
cookies/ForgetfulCookieJar
--------------------------
The ``ForgetfulCookieJar`` prevents a particular requests session from storing
cookies:
.. code-block:: python
from requests_toolbelt.cookies.forgetful import ForgetfulCookieJar
session = requests.Session()
session.cookies = ForgetfulCookieJar()
Known Issues
------------
On Python 3.3.0 and 3.3.1, the standard library's ``http`` module will fail
when passing an instance of the ``MultipartEncoder``. This is fixed in later
minor releases of Python 3.3. Please consider upgrading to a later minor
version or Python 3.4. *There is absolutely nothing this library can do to
work around that bug.*
Contributing
------------
Please read the `suggested workflow
<https://toolbelt.readthedocs.org/en/latest/contributing.html>`_ for
contributing to this project.
.. _Cory Benfield's blog: https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/
.. _python-requests: https://github.com/kennethreitz/requests
History
=======
0.8.0 -- 2017-05-20
-------------------
More information about this release can be found on the `0.8.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``UserAgentBuilder`` to provide more control over generated User-Agent
strings.
Fixed Bugs
~~~~~~~~~~
- Include ``_validate_certificate`` in the lits of picked attributes on the
``AppEngineAdapter``.
- Fix backwards incompatibility in ``get_encodings_from_content``
.. _0.8.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.8.0
0.7.1 -- 2017-02-13
-------------------
More information about this release can be found on the `0.7.1 milestone`_.
Fixed Bugs
~~~~~~~~~~
- Fixed monkey-patching for the AppEngineAdapter.
- Make it easier to disable certificate verification when monkey-patching
AppEngine.
- Handle ``multipart/form-data`` bodies without a trailing ``CRLF``.
.. links
.. _0.7.1 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestone/9
0.7.0 -- 2016-07-21
-------------------
More information about this release can be found on the `0.7.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``BaseUrlSession`` to allow developers to have a session that has a
"Base" URL. See the documentation for more details and examples.
- Split the logic of ``stream_response_to_file`` into two separate functions:
* ``get_download_file_path`` to generate the file name from the Response.
* ``stream_response_to_file`` which will use ``get_download_file_path`` if
necessary
Fixed Bugs
~~~~~~~~~~
- Fixed the issue for people using *very* old versions of Requests where they
would see an ImportError from ``requests_toolbelt._compat`` when trying to
import ``connection``.
.. _0.7.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.7.0
0.6.2 -- 2016-05-10
-------------------
Fixed Bugs
~~~~~~~~~~
- When passing a timeout via Requests, it was not appropriately translated to
the timeout that the urllib3 code was expecting.
0.6.1 -- 2016-05-05
-------------------
Fixed Bugs
~~~~~~~~~~
- Remove assertion about request URLs in the AppEngineAdapter.
- Prevent pip from installing requests 3.0.0 when that is released until we
are ready to handle it.
0.6.0 -- 2016-01-27
-------------------
More information about this release can be found on the `0.6.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``AppEngineAdapter`` to support developers using Google's AppEngine
platform with Requests.
- Add ``GuessProxyAuth`` class to support guessing between Basic and Digest
Authentication for proxies.
Fixed Bugs
~~~~~~~~~~
- Ensure that proxies use the correct TLS version when using the
``SSLAdapter``.
- Fix an ``AttributeError`` when using the ``HTTPProxyDigestAuth`` class.
Miscellaneous
~~~~~~~~~~~~~
- Drop testing support for Python 3.2. virtualenv and pip have stopped
supporting it meaning that it is harder to test for this with our CI
infrastructure. Moving forward we will make a best-effort attempt to
support 3.2 but will not test for it.
.. _0.6.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.6.0
0.5.1 -- 2015-12-16
-------------------
More information about this release can be found on the `0.5.1 milestone`_.
Fixed Bugs
~~~~~~~~~~
- Now papers over the differences in requests' ``super_len`` function from
versions prior to 2.9.0 and versions 2.9.0 and later.
.. _0.5.1 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.5.1
0.5.0 -- 2015-11-24
-------------------
More information about this release can be found on the `milestone
<https://github.com/sigmavirus24/requests-toolbelt/issues?utf8=%E2%9C%93&q=is%3Aall+milestone%3A0.5+>`_
for 0.5.0.
New Features
~~~~~~~~~~~~
- The ``tee`` submodule was added to ``requests_toolbelt.downloadutils``. It
allows you to iterate over the bytes of a response while also writing them
to a file. The ``tee.tee`` function, expects you to pass an open file
object, while ``tee.tee_to_file`` will use the provided file name to open
the file for you.
- Added a new parameter to ``requests_toolbelt.utils.user_agent`` that allows
the user to specify additional items.
- Added nested form-data helper,
``requests_toolbelt.utils.formdata.urlencode``.
- Added the ``ForgetfulCookieJar`` to ``requests_toolbelt.cookies``.
- Added utilities for dumping the information about a request-response cycle
in ``requests_toolbelt.utils.dump``.
- Implemented the API described in the ``requests_toolbelt.threaded`` module
docstring, i.e., added ``requests_toolbelt.threaded.map`` as an available
function.
Fixed Bugs
~~~~~~~~~~
- Now papers over the API differences in versions of requests installed from
system packages versus versions of requests installed from PyPI.
- Allow string types for ``SourceAddressAdapter``.
0.4.0 -- 2015-04-03
-------------------
For more information about this release, please see `milestone 0.4.0
<https://github.com/sigmavirus24/requests-toolbelt/issues?q=milestone%3A0.4>`_
on the project's page.
New Features
~~~~~~~~~~~~
- A naive implemenation of a thread pool is now included in the toolbelt. See
the docs in ``docs/threading.rst`` or on `Read The Docs
<https://toolbelt.readthedocs.org>`_.
- The ``StreamingIterator`` now accepts files (such as ``sys.stdin``) without
a specific length and will properly stream them.
- The ``MultipartEncoder`` now accepts exactly the same format of fields as
requests' ``files`` parameter does. In other words, you can now also pass in
extra headers to add to a part in the body. You can also now specify a
custom ``Content-Type`` for a part.
- An implementation of HTTP Digest Authentication for Proxies is now included.
- A transport adapter that allows a user to specify a specific Certificate
Fingerprint is now included in the toolbelt.
- A transport adapter that simplifies how users specify socket options is now
included.
- A transport adapter that simplifies how users can specify TCP Keep-Alive
options is now included in the toolbelt.
- Deprecated functions from ``requests.utils`` are now included and
maintained.
- An authentication tool that allows users to specify how to authenticate to
several different domains at once is now included.
- A function to save streamed responses to disk by analyzing the
``Content-Disposition`` header is now included in the toolbelt.
Fixed Bugs
~~~~~~~~~~
- The ``MultipartEncoder`` will now allow users to upload files larger than
4GB on 32-bit systems.
- The ``MultipartEncoder`` will now accept empty unicode strings for form
values.
0.3.1 -- 2014-06-23
-------------------
- Fix the fact that 0.3.0 bundle did not include the ``StreamingIterator``
0.3.0 -- 2014-05-21
-------------------
Bug Fixes
~~~~~~~~~
- Complete rewrite of ``MultipartEncoder`` fixes bug where bytes were lost in
uploads
New Features
~~~~~~~~~~~~
- ``MultipartDecoder`` to accept ``multipart/form-data`` response bodies and
parse them into an easy to use object.
- ``SourceAddressAdapter`` to allow users to choose a local address to bind
connections to.
- ``GuessAuth`` which accepts a username and password and uses the
``WWW-Authenticate`` header to determine how to authenticate against a
server.
- ``MultipartEncoderMonitor`` wraps an instance of the ``MultipartEncoder``
and keeps track of how many bytes were read and will call the provided
callback.
- ``StreamingIterator`` will wrap an iterator and stream the upload instead of
chunk it, provided you also provide the length of the content you wish to
upload.
0.2.0 -- 2014-02-24
-------------------
- Add ability to tell ``MultipartEncoder`` which encoding to use. By default
it uses 'utf-8'.
- Fix #10 - allow users to install with pip
- Fix #9 - Fix ``MultipartEncoder#to_string`` so that it properly handles file
objects as fields
0.1.2 -- 2014-01-19
-------------------
- At some point during development we broke how we handle normal file objects.
Thanks to @konomae this is now fixed.
0.1.1 -- 2014-01-19
-------------------
- Handle ``io.BytesIO``-like objects better
0.1.0 -- 2014-01-18
-------------------
- Add initial implementation of the streaming ``MultipartEncoder``
- Add initial implementation of the ``user_agent`` function
- Add the ``SSLAdapter``
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: Implementation :: CPython

114
README.rst Normal file
View File

@ -0,0 +1,114 @@
requests toolbelt
=================
This is just a collection of utilities for `python-requests`_, but don't
really belong in ``requests`` proper. The minimum tested requests version is
``2.1.0``. In reality, the toolbelt should work with ``2.0.1`` as well, but
some idiosyncracies prevent effective or sane testing on that version.
``pip install requests-toolbelt`` to get started!
multipart/form-data Encoder
---------------------------
The main attraction is a streaming multipart form-data object, ``MultipartEncoder``.
Its API looks like this:
.. code-block:: python
from requests_toolbelt import MultipartEncoder
import requests
m = MultipartEncoder(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')}
)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
You can also use ``multipart/form-data`` encoding for requests that don't
require files:
.. code-block:: python
from requests_toolbelt import MultipartEncoder
import requests
m = MultipartEncoder(fields={'field0': 'value', 'field1': 'value'})
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
Or, you can just create the string and examine the data:
.. code-block:: python
# Assuming `m` is one of the above
m.to_string() # Always returns unicode
User-Agent constructor
----------------------
You can easily construct a requests-style ``User-Agent`` string::
from requests_toolbelt import user_agent
headers = {
'User-Agent': user_agent('my_package', '0.0.1')
}
r = requests.get('https://api.github.com/users', headers=headers)
SSLAdapter
----------
The ``SSLAdapter`` was originally published on `Cory Benfield's blog`_.
This adapter allows the user to choose one of the SSL protocols made available
in Python's ``ssl`` module for outgoing HTTPS connections:
.. code-block:: python
from requests_toolbelt import SSLAdapter
import requests
import ssl
s = requests.Session()
s.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1))
cookies/ForgetfulCookieJar
--------------------------
The ``ForgetfulCookieJar`` prevents a particular requests session from storing
cookies:
.. code-block:: python
from requests_toolbelt.cookies.forgetful import ForgetfulCookieJar
session = requests.Session()
session.cookies = ForgetfulCookieJar()
Known Issues
------------
On Python 3.3.0 and 3.3.1, the standard library's ``http`` module will fail
when passing an instance of the ``MultipartEncoder``. This is fixed in later
minor releases of Python 3.3. Please consider upgrading to a later minor
version or Python 3.4. *There is absolutely nothing this library can do to
work around that bug.*
Contributing
------------
Please read the `suggested workflow
<https://toolbelt.readthedocs.org/en/latest/contributing.html>`_ for
contributing to this project.
.. _Cory Benfield's blog: https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/
.. _python-requests: https://github.com/kennethreitz/requests

3
dev-requirements.txt Normal file
View File

@ -0,0 +1,3 @@
pytest
mock
git+git://github.com/sigmavirus24/betamax

177
docs/Makefile Normal file
View File

@ -0,0 +1,177 @@
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/)
endif
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " texinfo to make Texinfo files"
@echo " info to make Texinfo files and run them through makeinfo"
@echo " gettext to make PO message catalogs"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " xml to make Docutils-native XML files"
@echo " pseudoxml to make pseudoxml-XML files for display purposes"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/requests_toolbelt.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/requests_toolbelt.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/requests_toolbelt"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/requests_toolbelt"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
latexpdfja:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through platex and dvipdfmx..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
texinfo:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
@echo "Run \`make' in that directory to run these through makeinfo" \
"(use \`make info' here to do that automatically)."
info:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo "Running Texinfo files through makeinfo..."
make -C $(BUILDDIR)/texinfo info
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
gettext:
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
@echo
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
xml:
$(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
@echo
@echo "Build finished. The XML files are in $(BUILDDIR)/xml."
pseudoxml:
$(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
@echo
@echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."

245
docs/adapters.rst Normal file
View File

@ -0,0 +1,245 @@
.. _adapters:
Transport Adapters
==================
The toolbelt comes with several different transport adapters for you to use
with requests. The transport adapters are all kept in
:mod:`requests_toolbelt.adapters` and include
- :class:`requests_toolbelt.adapters.appengine.AppEngineAdapter`
- :class:`requests_toolbelt.adapters.fingerprint.FingerprintAdapter`
- :class:`requests_toolbelt.adapters.socket_options.SocketOptionsAdapter`
- :class:`requests_toolbelt.adapters.socket_options.TCPKeepAliveAdapter`
- :class:`requests_toolbelt.adapters.source.SourceAddressAdapter`
- :class:`requests_toolbelt.adapters.ssl.SSLAdapter`
- :class:`requests_toolbelt.adapters.host_header_ssl.HostHeaderSSLAdapter`
AppEngineAdapter
----------------
.. versionadded:: 0.6.0
As of version 2.10.0, Requests will be capable of supporting Google's App
Engine platform. In order to use Requests on GAE, however, you will need a
custom adapter found here as
:class:`~requests_toolbelt.adapters.appengine.AppEngineAdapter`. There are two
ways to take advantage of this support at the moment:
#. Using the :class:`~requests_toolbelt.adapters.appengine.AppEngineAdapter`
like every other adapter, e.g.,
.. code-block:: python
import requests
from requests_toolbelt.adapters import appengine
s = requests.Session()
s.mount('http://', appengine.AppEngineAdapter())
s.mount('https://', appengine.AppEngineAdapter())
#. By monkey-patching requests to always use the provided adapter:
.. code-block:: python
import requests
from requests_toolbelt.adapters import appengine
appengine.monkeypatch()
.. _insecure_appengine:
If you should need to disable certificate validation when monkeypatching (to
force third-party libraries that use Requests to not validate certificates, if
they do not provide API surface to do so, for example), you can disable it:
.. code-block:: python
from requests_toolbelt.adapters import appengine
appengine.monkeypatch(validate_certificate=False)
.. warning::
If ``validate_certificate`` is ``False``, the monkeypatched adapter
will *not* validate certificates. This effectively sets the
``validate_certificate`` argument to urlfetch.Fetch() to ``False``. You
should avoid using this wherever possible. Details can be found in the
`documentation for urlfetch.Fetch()`_.
.. _documentation for urlfetch.Fetch(): https://cloud.google.com/appengine/docs/python/refdocs/google.appengine.api.urlfetch
.. autoclass:: requests_toolbelt.adapters.appengine.AppEngineAdapter
FingerprintAdapter
------------------
.. versionadded:: 0.4.0
By default, requests will validate a server's certificate to ensure a
connection is secure. In addition to this, the user can provide a fingerprint
of the certificate they're expecting to receive. Unfortunately, the requests
API does not support this fairly rare use-case. When a user needs this extra
validation, they should use the
:class:`~requests_toolbelt.adapters.fingerprint.FingerprintAdapter` class to
perform the validation.
.. autoclass:: requests_toolbelt.adapters.fingerprint.FingerprintAdapter
SSLAdapter
----------
The ``SSLAdapter`` is the canonical implementation of the adapter proposed on
Cory Benfield's blog, `here`_. This adapter allows the user to choose one of
the SSL/TLS protocols made available in Python's ``ssl`` module for outgoing
HTTPS connections.
In principle, this shouldn't be necessary: compliant SSL servers should be able
to negotiate the required SSL version. In practice there have been bugs in some
versions of OpenSSL that mean that this negotiation doesn't go as planned. It
can be useful to be able to simply plug in a Transport Adapter that can paste
over the problem.
For example, suppose you're having difficulty with the server that provides TLS
for GitHub. You can work around it by using the following code::
from requests_toolbelt.adapters.ssl import SSLAdapter
import requests
import ssl
s = requests.Session()
s.mount('https://github.com/', SSLAdapter(ssl.PROTOCOL_TLSv1))
Any future requests to GitHub made through that adapter will automatically
attempt to negotiate TLSv1, and hopefully will succeed.
.. autoclass:: requests_toolbelt.adapters.ssl.SSLAdapter
.. _here: https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/
HostHeaderSSLAdapter
--------------------
.. versionadded:: 0.7.0
Requests supports SSL Verification by default. However, it relies on
the user making a request with the URL that has the hostname in it. If,
however, the user needs to make a request with the IP address, they cannot
actually verify a certificate against the hostname they want to request.
To accomodate this very rare need, we've added
:class:`~requests_toolbelt.adapters.host_header_ssl.HostHeaderSSLAdapter`.
Example usage:
.. code-block:: python
import requests
from requests_toolbelt.adapters import host_header_ssl
s = requests.Session()
s.mount('https://', host_header_ssl.HostHeaderSSLAdapter())
s.get("https://93.184.216.34", headers={"Host": "example.org"})
.. autoclass:: requests_toolbelt.adapters.host_header_ssl.HostHeaderSSLAdapter
SourceAddressAdapter
--------------------
.. versionadded:: 0.3.0
The :class:`~requests_toolbelt.adapters.source.SourceAddressAdapter` allows a
user to specify a source address for their connnection.
.. autoclass:: requests_toolbelt.adapters.source.SourceAddressAdapter
SocketOptionsAdapter
--------------------
.. versionadded:: 0.4.0
.. note::
This adapter will only work with requests 2.4.0 or newer. The ability to
set arbitrary socket options does not exist prior to requests 2.4.0.
The ``SocketOptionsAdapter`` allows a user to pass specific options to be set
on created sockets when constructing the Adapter without subclassing. The
adapter takes advantage of ``urllib3``'s `support`_ for setting arbitrary
socket options for each ``urllib3.connection.HTTPConnection`` (and
``HTTPSConnection``).
To pass socket options, you need to send a list of three-item tuples. For
example, ``requests`` and ``urllib3`` disable `Nagle's Algorithm`_ by default.
If you need to re-enable it, you would do the following:
.. code-block:: python
import socket
import requests
from requests_toolbelt.adapters.socket_options import SocketOptionsAdapter
nagles = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)]
session = requests.Session()
for scheme in session.adapters.keys():
session.mount(scheme, SocketOptionsAdapter(socket_options=nagles))
This would re-enable Nagle's Algorithm for all ``http://`` and ``https://``
connections made with that session.
.. autoclass:: requests_toolbelt.adapters.socket_options.SocketOptionsAdapter
.. _support: https://urllib3.readthedocs.org/en/latest/pools.html?highlight=socket_options#urllib3.connection.HTTPConnection.socket_options
.. _Nagle's Algorithm: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
TCPKeepAliveAdapter
-------------------
.. versionadded:: 0.4.0
.. note::
This adapter will only work with requests 2.4.0 or newer. The ability to
set arbitrary socket options does not exist prior to requests 2.4.0.
The ``TCPKeepAliveAdapter`` allows a user to pass specific keep-alive related
options as keyword parameters as well as arbitrary socket options.
.. note::
Different keep-alive related socket options may not be available for your
platform. Check the socket module for the availability of the following
constants:
- ``socket.TCP_KEEPIDLE``
- ``socket.TCP_KEEPCNT``
- ``socket.TCP_KEEPINTVL``
The adapter will silently ignore any option passed for a non-existent
option.
An example usage of the adapter:
.. code-block:: python
import requests
from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter
session = requests.Session()
keep_alive = TCPKeepAliveAdapter(idle=120, count=20, interval=30)
session.mount('https://region-a.geo-1.compute.hpcloudsvc.com', keep_alive)
session.post('https://region-a.geo-1.compute.hpcloudsvc.com/v2/1234abcdef/servers',
# ...
)
In this case we know that creating a server on HP Public Cloud can cause
requests to hang without using TCP Keep-Alive. So we mount the adapter
specifically for that domain, instead of adding it to every ``https://`` and
``http://`` request.
.. autoclass:: requests_toolbelt.adapters.socket_options.TCPKeepAliveAdapter

142
docs/authentication.rst Normal file
View File

@ -0,0 +1,142 @@
.. _authentication:
Authentication
==============
requests supports Basic Authentication and HTTP Digest Authentication by
default. There are also a number of third-party libraries for authentication
with:
- `OAuth <https://requests-oauthlib.readthedocs.org/en/latest/>`_
- `NTLM <https://github.com/requests/requests-ntlm>`_
- `Kerberos <https://github.com/requests/requests-kerberos>`_
The :mod:`requests_toolbelt.auth` provides extra authentication features in
addition to those. It provides the following authentication classes:
- :class:`requests_toolbelt.auth.guess.GuessAuth`
- :class:`requests_toolbelt.auth.http_proxy_digest.HTTPProxyDigestAuth`
- :class:`requests_toolbelt.auth.handler.AuthHandler`
AuthHandler
-----------
The :class:`~requests_toolbelt.auth.handler.AuthHandler` is a way of using a
single session with multiple websites that require authentication. If you know
what websites require a certain kind of authentication and what your
credentials are.
Take for example a session that needs to authenticate to GitHub's API and
GitLab's API, you would set up and use your
:class:`~requests_toolbelt.auth.handler.AuthHandler` like so:
.. code-block:: python
import requests
from requests_toolbelt.auth.handler import AuthHandler
def gitlab_auth(request):
request.headers['PRIVATE-TOKEN'] = 'asecrettoken'
handler = AuthHandler({
'https://api.github.com': ('sigmavirus24', 'apassword'),
'https://gitlab.com': gitlab_auth,
})
session = requests.Session()
session.auth = handler
r = session.get('https://api.github.com/user')
# assert r.ok
r2 = session.get('https://gitlab.com/api/v3/projects')
# assert r2.ok
.. note::
You **must** provide both the scheme and domain for authentication. The
:class:`~requests_toolbelt.auth.handler.AuthHandler` class will check both
the scheme and host to ensure your data is not accidentally exposed.
.. autoclass:: requests_toolbelt.auth.handler.AuthHandler
:members:
GuessAuth
---------
The :class:`~requests_toolbelt.auth.guess.GuessAuth` authentication class
automatically detects whether to use basic auth or digest auth:
.. code-block:: python
import requests
from requests_toolbelt.auth import GuessAuth
requests.get('http://httpbin.org/basic-auth/user/passwd',
auth=GuessAuth('user', 'passwd'))
requests.get('http://httpbin.org/digest-auth/auth/user/passwd',
auth=GuessAuth('user', 'passwd'))
Detection of the auth type is done via the ``WWW-Authenticate`` header sent by
the server. This requires an additional request in case of basic auth, as
usually basic auth is sent preemptively. If the server didn't explicitly
require authentication, no credentials are sent.
.. autoclass:: requests_toolbelt.auth.guess.GuessAuth
GuessProxyAuth
--------------
The :class:`~requests_toolbelt.auth.guess.GuessProxyAuth` handler will
automatically detect whether to use basic authentication or digest authentication
when authenticating to the provided proxy.
.. code-block:: python
import requests
from requests_toolbelt.auth.guess import GuessProxyAuth
proxies = {
"http": "http://PROXYSERVER:PROXYPORT",
"https": "http://PROXYSERVER:PROXYPORT",
}
requests.get('http://httpbin.org/basic-auth/user/passwd',
auth=GuessProxyAuth('user', 'passwd', 'proxyusr', 'proxypass'),
proxies=proxies)
requests.get('http://httpbin.org/digest-auth/auth/user/passwd',
auth=GuessProxyAuth('user', 'passwd', 'proxyusr', 'proxypass'),
proxies=proxies)
Detection of the auth type is done via the ``Proxy-Authenticate`` header sent by
the server. This requires an additional request in case of basic auth, as
usually basic auth is sent preemptively. If the server didn't explicitly
require authentication, no credentials are sent.
.. autoclass:: requests_toolbelt.auth.guess.GuessProxyAuth
HTTPProxyDigestAuth
-------------------
The ``HTTPProxyDigestAuth`` use digest authentication between the client and
the proxy.
.. code-block:: python
import requests
from requests_toolbelt.auth.http_proxy_digest import HTTPProxyDigestAuth
proxies = {
"http": "http://PROXYSERVER:PROXYPORT",
"https": "https://PROXYSERVER:PROXYPORT",
}
url = "https://toolbelt.readthedocs.org/"
auth = HTTPProxyDigestAuth("USERNAME", "PASSWORD")
requests.get(url, proxies=proxies, auth=auth)
Program would raise error if the username or password is rejected by the proxy.
.. autoclass:: requests_toolbelt.auth.http_proxy_digest.HTTPProxyDigestAuth

271
docs/conf.py Normal file
View File

@ -0,0 +1,271 @@
# -*- coding: utf-8 -*-
#
# requests_toolbelt documentation build configuration file, created by
# sphinx-quickstart on Sun Jan 12 21:24:39 2014.
#
# This file is execfile()d with the current directory set to its
# containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
sys.path.insert(0, os.path.abspath('..'))
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'requests_toolbelt'
copyright = u'2015, Ian Cordasco, Cory Benfield'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
from requests_toolbelt import __version__ as version
# The full version, including alpha/beta/rc tags.
release = version
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build']
# The reST default role (used for this markup: `text`) to use for all
# documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# If true, keep warnings as "system message" paragraphs in the built documents.
#keep_warnings = False
# -- Options for HTML output ----------------------------------------------
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if not on_rtd: # only import and set the theme if we're building docs locally
import sphinx_rtd_theme
html_theme = 'sphinx_rtd_theme'
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#html_theme = 'alabaster'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
#html_static_path = ['_static']
# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
# directly to the root of the documentation.
#html_extra_path = []
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'requests_toolbelt-doc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
('index', 'requests_toolbelt.tex', u'requests\\_toolbelt Documentation',
u'Ian Cordasco, Cory Benfield', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'requests_toolbelt', u'requests_toolbelt Documentation',
[u'Ian Cordasco, Cory Benfield'], 1)
]
# If true, show URL addresses after external links.
#man_show_urls = False
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'requests_toolbelt', u'requests_toolbelt Documentation',
u'Ian Cordasco, Cory Benfield', 'requests_toolbelt', 'One line description of project.',
'Miscellaneous'),
]
# Documents to append as an appendix to all manuals.
#texinfo_appendices = []
# If false, no module index is generated.
#texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#texinfo_show_urls = 'footnote'
# If true, do not generate a @detailmenu in the "Top" node's menu.
#texinfo_no_detailmenu = False
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'http://docs.python.org/': None}

161
docs/contributing.rst Normal file
View File

@ -0,0 +1,161 @@
Contributing to this project
============================
Checklist
---------
#. All potential contributors must read the :ref:`code-of-conduct` and follow
it
#. Fork the repository on `GitHub`_ or `GitLab`_
#. Create a new branch, e.g., ``git checkout -b bug/12345``
#. Fix the bug and add tests (if applicable [#]_, see :ref:`how-to-add-tests`)
#. Run the tests (see :ref:`how-to-run-tests` below)
#. Add documentation (as necessary) for your change
#. Build the documentation to check for errors and formatting (see
:ref:`how-to-build-the-docs` below)
#. Add yourself to the :file:`AUTHORS.rst` (unless you're already there)
#. Commit it. Follow these rules in your commit message:
* Keep the subject line under 50 characters
* Use an imperative verb to start the commit
* Use an empty line between the subject and the message
* Describe the *why* in detail in the message portion of the commit
* Wrap the lines of the message at 72 characters
* Add the appropriate "Closes #12345" syntax to autoclose the issue it
fixed (if it closes an issue)
* See :ref:`example-commit-message` below
#. Push it to your fork
#. Create a request for us to merge your contribution
After this last step, it is possible that we may leave feedback in the form of
review comments. When addressing these comments, you can follow two
strategies:
* Amend/rebase your changes into an existing commit
* Create a new commit with a different message [#]_ describing the changes in
that commit and push it to your branch
This project is not opinionated about which approach you should prefer. We
only ask that you are aware of the following:
* Neither GitHub nor GitLab notifies us that you have pushed new changes. A
friendly ping is encouraged
* If you continue to use the same branch that you created the request from,
both GitHub and GitLab will update the request on the website. You do
**not** need to create a new request for the new changes.
.. _code-of-conduct:
.. include:: ../CODE_OF_CONDUCT.rst
.. _how-to-add-tests:
How To Add Tests
----------------
We use `pytest`_ to run tests and to simplify how we write tests. If you're
fixing a bug in an existing please find tests for that module or feature and
add to them. Most tests live in the ``tests`` directory. If you're adding a
new feature in a new submodule, please create a new module of test code. For
example, if you're adding a submodule named ``foo`` then you would create
``tests/test_foo.py`` which will contain the tests for the ``foo`` submodule.
.. _how-to-run-tests:
How To Run The Tests
--------------------
Run the tests in this project using `tox`_. Before you run the tests, ensure
you have installed tox either using your system package manager (e.g., apt,
yum, etc.), or your prefered python installer (e.g., pip).
Then run the tests on at least Python 2.7 and Python 3.x, e.g.,
.. code::
$ tox -e py27,py34
Finally run one, or both, of the flake8 style enforcers, e.g.,
.. code::
$ tox -e py27-flake8
# or
$ tox -e py34-flake8
It is preferable if you run both to catch syntax errors that might occur in
Python 2 or Python 3 (based on how familiar you are with the common subset of
language from both).
Tox will manage virtual environments and dependencies for you so it will be
the only dependency you need to install to contribute to this project.
.. _how-to-build-the-docs:
How To Build The Documentation
------------------------------
To build the docs, you need to ensure tox is installed and then you may run
.. code::
$ tox -e docs
This will build the documentation into ``docs/_build/html``. If you then run
.. code::
$ python2.7 -m SimpleHTTPServer
# or
$ python3.4 -m http.server
from that directory, you can view the docs locally at http://localhost:8000/.
.. _example-commit-message:
Example Commit Message
----------------------
::
Allow users to use the frob when uploading data
When uploading data with FooBar, users may need to use the frob method
to ensure that pieces of data are not munged.
Closes #1234567
Footnotes
---------
.. [#] You might not need tests if you're updating documentation, fixing a
typo, or updating a docstring. If you're fixing a bug, please add
tests.
.. [#] If each commit has the same message, the reviewer may ask you to
squash your commits or may squash them for you and perform a manual
merge.
.. _GitHub: https://github.com/sigmavirus24/requests-toolbelt
.. _GitLab: https://gitlab.com/sigmavirus24/toolbelt
.. _tox: https://tox.readthedocs.org/en/latest/
.. _pytest: http://pytest.org/latest/

13
docs/deprecated.rst Normal file
View File

@ -0,0 +1,13 @@
.. _deprecated:
Deprecated Requests Utilities
=============================
Requests has `decided`_ to deprecate some utility functions in
:mod:`requests.utils`. To ease users' lives, they've been moved to
:mod:`requests_toolbelt.utils.deprecated`.
.. automodule:: requests_toolbelt.utils.deprecated
:members:
.. _decided: https://github.com/kennethreitz/requests/issues/2266

16
docs/downloadutils.rst Normal file
View File

@ -0,0 +1,16 @@
.. _downloadutils:
Utilities for Downloading Streaming Responses
=============================================
.. autofunction::
requests_toolbelt.downloadutils.stream.stream_response_to_file
.. autofunction::
requests_toolbelt.downloadutils.tee.tee
.. autofunction::
requests_toolbelt.downloadutils.tee.tee_to_bytearray
.. autofunction::
requests_toolbelt.downloadutils.tee.tee_to_file

17
docs/dumputils.rst Normal file
View File

@ -0,0 +1,17 @@
.. _dumputils:
Utilities for Dumping Information About Responses
=================================================
Occasionally, it is helpful to know almost exactly what data was sent to a
server and what data was received. It can also be challenging at times to
gather all of that data from requests because of all of the different places
you may need to look to find it. In :mod:`requests_toolbelt.utils.dump` there
are two functions that will return a :class:`bytearray` with the information
retrieved from a response object.
.. autofunction::
requests_toolbelt.utils.dump.dump_all
.. autofunction::
requests_toolbelt.utils.dump.dump_response

10
docs/exceptions.rst Normal file
View File

@ -0,0 +1,10 @@
.. _exceptions:
Custom Toolbelt Exceptions
==========================
Below are the exception classes used by the toolbelt to provide error details
to the user of the toolbelt.
.. automodule:: requests_toolbelt.exceptions
:members:

7
docs/formdata.rst Normal file
View File

@ -0,0 +1,7 @@
.. _formdatautils:
Utilities for Enhanced Form-Data Serialization
==============================================
.. autofunction::
requests_toolbelt.utils.formdata.urlencode

50
docs/index.rst Normal file
View File

@ -0,0 +1,50 @@
.. requests_toolbelt documentation master file, created by
sphinx-quickstart on Sun Jan 12 21:24:39 2014.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
requests toolbelt
=================
This is a collection of utilities that some users of python-requests might need
but do not belong in requests proper. The library is actively maintained by
members of the requests core development team, and so reflects the
functionality most requested by users of the requests library.
To get an overview of what the library contains, consult the :ref:`user <user>`
documentation.
Overview
--------
.. toctree::
:maxdepth: 1
user
contributing
Full Documentation
------------------
.. toctree::
:maxdepth: 2
adapters
authentication
deprecated
downloadutils
dumputils
formdata
exceptions
sessions
threading
uploading-data
user-agent
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

242
docs/make.bat Normal file
View File

@ -0,0 +1,242 @@
@ECHO OFF
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set BUILDDIR=_build
set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
set I18NSPHINXOPTS=%SPHINXOPTS% .
if NOT "%PAPER%" == "" (
set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS%
)
if "%1" == "" goto help
if "%1" == "help" (
:help
echo.Please use `make ^<target^>` where ^<target^> is one of
echo. html to make standalone HTML files
echo. dirhtml to make HTML files named index.html in directories
echo. singlehtml to make a single large HTML file
echo. pickle to make pickle files
echo. json to make JSON files
echo. htmlhelp to make HTML files and a HTML help project
echo. qthelp to make HTML files and a qthelp project
echo. devhelp to make HTML files and a Devhelp project
echo. epub to make an epub
echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
echo. text to make text files
echo. man to make manual pages
echo. texinfo to make Texinfo files
echo. gettext to make PO message catalogs
echo. changes to make an overview over all changed/added/deprecated items
echo. xml to make Docutils-native XML files
echo. pseudoxml to make pseudoxml-XML files for display purposes
echo. linkcheck to check all external links for integrity
echo. doctest to run all doctests embedded in the documentation if enabled
goto end
)
if "%1" == "clean" (
for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
del /q /s %BUILDDIR%\*
goto end
)
%SPHINXBUILD% 2> nul
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
if "%1" == "html" (
%SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The HTML pages are in %BUILDDIR%/html.
goto end
)
if "%1" == "dirhtml" (
%SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
goto end
)
if "%1" == "singlehtml" (
%SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
goto end
)
if "%1" == "pickle" (
%SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
if errorlevel 1 exit /b 1
echo.
echo.Build finished; now you can process the pickle files.
goto end
)
if "%1" == "json" (
%SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
if errorlevel 1 exit /b 1
echo.
echo.Build finished; now you can process the JSON files.
goto end
)
if "%1" == "htmlhelp" (
%SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
if errorlevel 1 exit /b 1
echo.
echo.Build finished; now you can run HTML Help Workshop with the ^
.hhp project file in %BUILDDIR%/htmlhelp.
goto end
)
if "%1" == "qthelp" (
%SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
if errorlevel 1 exit /b 1
echo.
echo.Build finished; now you can run "qcollectiongenerator" with the ^
.qhcp project file in %BUILDDIR%/qthelp, like this:
echo.^> qcollectiongenerator %BUILDDIR%\qthelp\requests_toolbelt.qhcp
echo.To view the help file:
echo.^> assistant -collectionFile %BUILDDIR%\qthelp\requests_toolbelt.ghc
goto end
)
if "%1" == "devhelp" (
%SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
if errorlevel 1 exit /b 1
echo.
echo.Build finished.
goto end
)
if "%1" == "epub" (
%SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The epub file is in %BUILDDIR%/epub.
goto end
)
if "%1" == "latex" (
%SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
if errorlevel 1 exit /b 1
echo.
echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
goto end
)
if "%1" == "latexpdf" (
%SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
cd %BUILDDIR%/latex
make all-pdf
cd %BUILDDIR%/..
echo.
echo.Build finished; the PDF files are in %BUILDDIR%/latex.
goto end
)
if "%1" == "latexpdfja" (
%SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
cd %BUILDDIR%/latex
make all-pdf-ja
cd %BUILDDIR%/..
echo.
echo.Build finished; the PDF files are in %BUILDDIR%/latex.
goto end
)
if "%1" == "text" (
%SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The text files are in %BUILDDIR%/text.
goto end
)
if "%1" == "man" (
%SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The manual pages are in %BUILDDIR%/man.
goto end
)
if "%1" == "texinfo" (
%SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo.
goto end
)
if "%1" == "gettext" (
%SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The message catalogs are in %BUILDDIR%/locale.
goto end
)
if "%1" == "changes" (
%SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
if errorlevel 1 exit /b 1
echo.
echo.The overview file is in %BUILDDIR%/changes.
goto end
)
if "%1" == "linkcheck" (
%SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
if errorlevel 1 exit /b 1
echo.
echo.Link check complete; look for any errors in the above output ^
or in %BUILDDIR%/linkcheck/output.txt.
goto end
)
if "%1" == "doctest" (
%SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
if errorlevel 1 exit /b 1
echo.
echo.Testing of doctests in the sources finished, look at the ^
results in %BUILDDIR%/doctest/output.txt.
goto end
)
if "%1" == "xml" (
%SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The XML files are in %BUILDDIR%/xml.
goto end
)
if "%1" == "pseudoxml" (
%SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml
if errorlevel 1 exit /b 1
echo.
echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml.
goto end
)
:end

24
docs/sessions.rst Normal file
View File

@ -0,0 +1,24 @@
.. _sessions:
Specialized Sessions
====================
The toolbelt provides specialized session classes in the
:mod:`requests_toolbelt.sessions` module.
.. automodule:: requests_toolbelt.sessions
:members:
BaseUrlSession
--------------
.. versionadded:: 0.7.0
Many people have written Session subclasses that allow a "base URL" to be
specified so all future requests need not specify the complete URL. To create
one simplified and easy to configure version, we've added the
:class:`requests_toolbelt.sessions.BaseUrlSession` object to the Toolbelt.
.. autoclass:: requests_toolbelt.sessions.BaseUrlSession
:members:

170
docs/threading.rst Normal file
View File

@ -0,0 +1,170 @@
.. _threading:
Using requests with Threading
=============================
.. versionadded:: 0.4.0
The toolbelt provides a simple API for using requests with threading.
A requests Session is documented as threadsafe but there are still a couple
corner cases where it isn't perfectly threadsafe. The best way to use a
Session is to use one per thread.
The implementation provided by the toolbelt is naïve. This means that we use
one session per thread and we make no effort to synchronize attributes (e.g.,
authentication, cookies, etc.). It also means that we make no attempt to
direct a request to a session that has already handled a request to the same
domain. In other words, if you're making requests to multiple domains, the
toolbelt's Pool will not try to send requests to the same domain to the same
thread.
This module provides three classes:
- :class:`~requests_toolbelt.threaded.pool.Pool`
- :class:`~requests_toolbelt.threaded.pool.ThreadResponse`
- :class:`~requests_toolbelt.threaded.pool.ThreadException`
In 98% of the situations you'll want to just use a
:class:`~requests_toolbelt.threaded.pool.Pool` and you'll treat a
:class:`~requests_toolbelt.threaded.pool.ThreadResponse` as if it were a
regular :class:`requests.Response`.
Here's an example:
.. code-block:: python
# This example assumes Python 3
import queue
from requests_toolbelt.threaded import pool
jobs = queue.Queue()
urls = [
# My list of URLs to get
]
for url in urls:
queue.put({'method': 'GET', 'url': url})
p = pool.Pool(job_queue=q)
p.join_all()
for response in p.responses():
print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
response.status_code))
This is clearly a bit underwhelming. This is why there's a short-cut class
method to create a :class:`~requests_toolbelt.threaded.pool.Pool` from a list
of URLs.
.. code-block:: python
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
for response in p.responses():
print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
response.status_code))
If one of the URLs in your list throws an exception, it will be accessible
from the :meth:`~Pool.exceptions` generator.
.. code-block:: python
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
for exc in p.exceptions():
print('GET {0}. Raised {1}.'.format(exc.request_kwargs['url'],
exc.message))
If instead, you want to retry the exceptions that have been raised you can do
the following:
.. code-block:: python
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
new_pool = pool.Pool.from_exceptions(p.exceptions())
new_pool.join_all()
Not all requests are advisable to retry without checking if they should be
retried. You would normally check if you want to retry it.
The :class:`~Pool` object takes 4 other keyword arguments:
- ``initializer``
This is a callback that will initialize things on every session created. The
callback must return the session.
- ``auth_generator``
This is a callback that is called *after* the initializer callback has
modified the session. This callback must also return the session.
- ``num_processes``
By passing a positive integer that indicates how many threads to use. It is
``None`` by default, and will use the result of
``multiproccessing.cpu_count()``.
- ``session``
You can pass an alternative constructor or any callable that returns a
:class:`requests.Sesssion` like object. It will not be passed any arguments
because a :class:`requests.Session` does not accept any arguments.
Finally, if you don't want to worry about Queue or Pool management, you can
try the following:
.. code-block:: python
from requests_toolbelt import threaded
requests = [{
'method': 'GET',
'url': 'https://httpbin.org/get',
# ...
}, {
# ...
}, {
# ...
}]
responses_generator, exceptions_generator = threaded.map(requests)
for response in responses_generator:
# Do something
API and Module Auto-Generated Documentation
-------------------------------------------
.. automodule:: requests_toolbelt.threaded
.. autoclass:: requests_toolbelt.threaded.pool.Pool
:members:
.. autoclass:: requests_toolbelt.threaded.pool.ThreadResponse
:members:
.. autoclass:: requests_toolbelt.threaded.pool.ThreadException
:members:

172
docs/uploading-data.rst Normal file
View File

@ -0,0 +1,172 @@
.. _uploading-data:
Uploading Data
==============
Streaming Multipart Data Encoder
--------------------------------
Requests has `support for multipart uploads`_, but the API means that using
that functionality to build exactly the Multipart upload you want can be
difficult or impossible. Additionally, when using Requests' Multipart upload
functionality all the data must be read into memory before being sent to the
server. In extreme cases, this can make it impossible to send a file as part of
a ``multipart/form-data`` upload.
The toolbelt contains a class that allows you to build multipart request bodies
in exactly the format you need, and to avoid reading files into memory. An
example of how to use it is like this:
.. code-block:: python
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
m = MultipartEncoder(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')}
)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
The :class:`~requests_toolbelt.multipart.encoder.MultipartEncoder` has the
``.to_string()`` convenience method, as well. This method renders the
multipart body into a string. This is useful when developing your code,
allowing you to confirm that the multipart body has the form you expect before
you send it on.
The toolbelt also provides a way to monitor your streaming uploads with
the :class:`~requests_toolbelt.multipart.encoder.MultipartEncoderMonitor`.
.. autoclass:: requests_toolbelt.multipart.encoder.MultipartEncoder
.. _support for multipart uploads: http://docs.python-requests.org/en/latest/user/quickstart/#post-a-multipart-encoded-file
Monitoring Your Streaming Multipart Upload
------------------------------------------
If you need to stream your ``multipart/form-data`` upload then you're probably
in the situation where it might take a while to upload the content. In these
cases, it might make sense to be able to monitor the progress of the upload.
For this reason, the toolbelt provides the
:class:`~requests_toolbelt.multipart.encoder.MultipartEncoderMonitor`. The
monitor wraps an instance of a
:class:`~requests_toolbelt.multipart.encoder.MultipartEncoder` and is used
exactly like the encoder. It provides a similar API with some additions:
- The monitor accepts a function as a callback. The function is called every
time ``requests`` calls ``read`` on the monitor and passes in the monitor as
an argument.
- The monitor tracks how many bytes have been read in the course of the
upload.
You might use the monitor to create a progress bar for the upload. Here is `an
example using clint`_ which displays the progress bar.
To use the monitor you would follow a pattern like this:
.. code-block:: python
import requests
from requests_toolbelt.multipart import encoder
def my_callback(monitor):
# Your callback function
pass
e = encoder.MultipartEncoder(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')}
)
m = encoder.MultipartEncoderMonitor(e, my_callback)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
If you have a very simple use case you can also do:
.. code-block:: python
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoderMonitor
def my_callback(monitor):
# Your callback function
pass
m = MultipartEncoderMonitor.from_fields(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')},
callback=my_callback
)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
.. autoclass:: requests_toolbelt.multipart.encoder.MultipartEncoderMonitor
.. _an example using clint:
https://gitlab.com/sigmavirus24/toolbelt/blob/master/examples/monitor/progress_bar.py
Streaming Data from a Generator
-------------------------------
There are cases where you, the user, have a generator of some large quantity
of data and you already know the size of that data. If you pass the generator
to ``requests`` via the ``data`` parameter, ``requests`` will assume that you
want to upload the data in chunks and set a ``Transfer-Encoding`` header value
of ``chunked``. Often times, this causes the server to behave poorly. If you
want to avoid this, you can use the
:class:`~requests.toolbelt.streaming_iterator.StreamingIterator`. You pass it
the size of the data and the generator.
.. code-block:: python
import requests
from requests_toolbelt.streaming_iterator import StreamingIterator
generator = some_function() # Create your generator
size = some_function_size() # Get your generator's size
content_type = content_type() # Get the content-type of the data
streamer = StreamingIterator(size, generator)
r = requests.post('https://httpbin.org/post', data=streamer,
headers={'Content-Type': content_type})
The streamer will handle your generator for you and buffer the data before
passing it to ``requests``.
.. versionchanged:: 0.4.0
File-like objects can be passed instead of a generator.
If, for example, you need to upload data being piped into standard in, you
might otherwise do:
.. code-block:: python
import requests
import sys
r = requests.post(url, data=sys.stdin)
This would stream the data but would use a chunked transfer-encoding. If
instead, you know the length of the data that is being sent to ``stdin`` and
you want to prevent the data from being uploaded in chunks, you can use the
:class:`~requests_toolbelt.streaming_iterator.StreamingIterator` to stream the
contents of the file without relying on chunking.
.. code-block:: python
import requests
from requests_toolbelt.streaming_iterator import StreamingIterator
import sys
stream = StreamingIterator(size, sys.stdin)
r = requests.post(url, data=stream,
headers={'Content-Type': content_type})
.. autoclass:: requests_toolbelt.streaming_iterator.StreamingIterator

94
docs/user-agent.rst Normal file
View File

@ -0,0 +1,94 @@
.. _user-agent:
User-Agent Constructor
======================
Having well-formed user-agent strings is important for the proper functioning
of the web. Make server administators happy by generating yourself a nice
user-agent string, just like Requests does! The output of the user-agent
generator looks like this::
>>> import requests_toolbelt
>>> requests_toolbelt.user_agent('mypackage', '0.0.1')
'mypackage/0.0.1 CPython/2.7.5 Darwin/13.0.0'
The Python type and version, and the platform type and version, will accurately
reflect the system that your program is running on. You can drop this easily
into your program like this::
from requests_toolbelt import user_agent
from requests import Session
s = Session()
s.headers = {
'User-Agent': user_agent('my_package', '0.0.1')
}
r = s.get('https://api.github.com/users')
This will override the default Requests user-agent string for all of your HTTP
requests, replacing it with your own.
Adding Extra Information to Your User-Agent String
--------------------------------------------------
.. versionadded:: 0.5.0
If you feel it necessary, you can also include versions for other things that
your client is using. For example if you were building a package and wanted to
include the package name and version number as well as the version of requests
and requests-toolbelt you were using you could do the following:
.. code-block:: python
import requests
import requests_toolbelt
from requests_toolbelt.utils import user_agent as ua
user_agent = ua.user_agent('mypackage', '0.0.1',
extras=[('requests', requests.__version__),
('requests-toolbelt', requests_toolbelt.__version__)])
s = requests.Session()
s.headers['User-Agent'] = user_agent
Your user agent will now look like::
mypackage/0.0.1 requests/2.7.0 requests-toolbelt/0.5.0 CPython/2.7.10 Darwin/13.0.0
Selecting Only What You Want
----------------------------
.. versionadded:: 0.8.0
While most people will find the ``user_agent`` function sufficient for their
usage, others will want to control exactly what information is included in the
User-Agent. For those people, the
:class:`~requests_toolbelt.utils.user_agent.UserAgentBuilder` is the correct
tool. This is the tool that the toolbelt uses inside of
:func:`~requests_toolbelt.utils.user_agent.user_agent`. For example, let's say
you *only* want your package, its versions, and some extra information, in
that case you would do:
.. code-block:: python
import requests
from requests_toolbelt.utils import user_agent as ua
s = requests.Session()
s.headers['User-Agent'] = ua.UserAgentBuilder(
'mypackage', '0.0.1',
).include_extras([
('requests', requests.__version__),
]).build()
Your user agent will now look like::
mypackage/0.0.1 requests/2.7.0
You can also optionally include the Python version information and System
information the same way that our ``user_agent`` function does.
.. autoclass:: requests_toolbelt.utils.user_agent.UserAgentBuilder
:members:

3
docs/user.rst Normal file
View File

@ -0,0 +1,3 @@
.. _user:
.. include:: ../README.rst

View File

@ -0,0 +1,434 @@
Metadata-Version: 1.1
Name: requests-toolbelt
Version: 0.8.0
Summary: A utility belt for advanced users of python-requests
Home-page: https://toolbelt.readthedocs.org
Author: Ian Cordasco, Cory Benfield
Author-email: graffatcolmingov@gmail.com
License: Apache 2.0
Description: requests toolbelt
=================
This is just a collection of utilities for `python-requests`_, but don't
really belong in ``requests`` proper. The minimum tested requests version is
``2.1.0``. In reality, the toolbelt should work with ``2.0.1`` as well, but
some idiosyncracies prevent effective or sane testing on that version.
``pip install requests-toolbelt`` to get started!
multipart/form-data Encoder
---------------------------
The main attraction is a streaming multipart form-data object, ``MultipartEncoder``.
Its API looks like this:
.. code-block:: python
from requests_toolbelt import MultipartEncoder
import requests
m = MultipartEncoder(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')}
)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
You can also use ``multipart/form-data`` encoding for requests that don't
require files:
.. code-block:: python
from requests_toolbelt import MultipartEncoder
import requests
m = MultipartEncoder(fields={'field0': 'value', 'field1': 'value'})
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
Or, you can just create the string and examine the data:
.. code-block:: python
# Assuming `m` is one of the above
m.to_string() # Always returns unicode
User-Agent constructor
----------------------
You can easily construct a requests-style ``User-Agent`` string::
from requests_toolbelt import user_agent
headers = {
'User-Agent': user_agent('my_package', '0.0.1')
}
r = requests.get('https://api.github.com/users', headers=headers)
SSLAdapter
----------
The ``SSLAdapter`` was originally published on `Cory Benfield's blog`_.
This adapter allows the user to choose one of the SSL protocols made available
in Python's ``ssl`` module for outgoing HTTPS connections:
.. code-block:: python
from requests_toolbelt import SSLAdapter
import requests
import ssl
s = requests.Session()
s.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1))
cookies/ForgetfulCookieJar
--------------------------
The ``ForgetfulCookieJar`` prevents a particular requests session from storing
cookies:
.. code-block:: python
from requests_toolbelt.cookies.forgetful import ForgetfulCookieJar
session = requests.Session()
session.cookies = ForgetfulCookieJar()
Known Issues
------------
On Python 3.3.0 and 3.3.1, the standard library's ``http`` module will fail
when passing an instance of the ``MultipartEncoder``. This is fixed in later
minor releases of Python 3.3. Please consider upgrading to a later minor
version or Python 3.4. *There is absolutely nothing this library can do to
work around that bug.*
Contributing
------------
Please read the `suggested workflow
<https://toolbelt.readthedocs.org/en/latest/contributing.html>`_ for
contributing to this project.
.. _Cory Benfield's blog: https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/
.. _python-requests: https://github.com/kennethreitz/requests
History
=======
0.8.0 -- 2017-05-20
-------------------
More information about this release can be found on the `0.8.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``UserAgentBuilder`` to provide more control over generated User-Agent
strings.
Fixed Bugs
~~~~~~~~~~
- Include ``_validate_certificate`` in the lits of picked attributes on the
``AppEngineAdapter``.
- Fix backwards incompatibility in ``get_encodings_from_content``
.. _0.8.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.8.0
0.7.1 -- 2017-02-13
-------------------
More information about this release can be found on the `0.7.1 milestone`_.
Fixed Bugs
~~~~~~~~~~
- Fixed monkey-patching for the AppEngineAdapter.
- Make it easier to disable certificate verification when monkey-patching
AppEngine.
- Handle ``multipart/form-data`` bodies without a trailing ``CRLF``.
.. links
.. _0.7.1 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestone/9
0.7.0 -- 2016-07-21
-------------------
More information about this release can be found on the `0.7.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``BaseUrlSession`` to allow developers to have a session that has a
"Base" URL. See the documentation for more details and examples.
- Split the logic of ``stream_response_to_file`` into two separate functions:
* ``get_download_file_path`` to generate the file name from the Response.
* ``stream_response_to_file`` which will use ``get_download_file_path`` if
necessary
Fixed Bugs
~~~~~~~~~~
- Fixed the issue for people using *very* old versions of Requests where they
would see an ImportError from ``requests_toolbelt._compat`` when trying to
import ``connection``.
.. _0.7.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.7.0
0.6.2 -- 2016-05-10
-------------------
Fixed Bugs
~~~~~~~~~~
- When passing a timeout via Requests, it was not appropriately translated to
the timeout that the urllib3 code was expecting.
0.6.1 -- 2016-05-05
-------------------
Fixed Bugs
~~~~~~~~~~
- Remove assertion about request URLs in the AppEngineAdapter.
- Prevent pip from installing requests 3.0.0 when that is released until we
are ready to handle it.
0.6.0 -- 2016-01-27
-------------------
More information about this release can be found on the `0.6.0 milestone`_.
New Features
~~~~~~~~~~~~
- Add ``AppEngineAdapter`` to support developers using Google's AppEngine
platform with Requests.
- Add ``GuessProxyAuth`` class to support guessing between Basic and Digest
Authentication for proxies.
Fixed Bugs
~~~~~~~~~~
- Ensure that proxies use the correct TLS version when using the
``SSLAdapter``.
- Fix an ``AttributeError`` when using the ``HTTPProxyDigestAuth`` class.
Miscellaneous
~~~~~~~~~~~~~
- Drop testing support for Python 3.2. virtualenv and pip have stopped
supporting it meaning that it is harder to test for this with our CI
infrastructure. Moving forward we will make a best-effort attempt to
support 3.2 but will not test for it.
.. _0.6.0 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.6.0
0.5.1 -- 2015-12-16
-------------------
More information about this release can be found on the `0.5.1 milestone`_.
Fixed Bugs
~~~~~~~~~~
- Now papers over the differences in requests' ``super_len`` function from
versions prior to 2.9.0 and versions 2.9.0 and later.
.. _0.5.1 milestone:
https://github.com/sigmavirus24/requests-toolbelt/milestones/0.5.1
0.5.0 -- 2015-11-24
-------------------
More information about this release can be found on the `milestone
<https://github.com/sigmavirus24/requests-toolbelt/issues?utf8=%E2%9C%93&q=is%3Aall+milestone%3A0.5+>`_
for 0.5.0.
New Features
~~~~~~~~~~~~
- The ``tee`` submodule was added to ``requests_toolbelt.downloadutils``. It
allows you to iterate over the bytes of a response while also writing them
to a file. The ``tee.tee`` function, expects you to pass an open file
object, while ``tee.tee_to_file`` will use the provided file name to open
the file for you.
- Added a new parameter to ``requests_toolbelt.utils.user_agent`` that allows
the user to specify additional items.
- Added nested form-data helper,
``requests_toolbelt.utils.formdata.urlencode``.
- Added the ``ForgetfulCookieJar`` to ``requests_toolbelt.cookies``.
- Added utilities for dumping the information about a request-response cycle
in ``requests_toolbelt.utils.dump``.
- Implemented the API described in the ``requests_toolbelt.threaded`` module
docstring, i.e., added ``requests_toolbelt.threaded.map`` as an available
function.
Fixed Bugs
~~~~~~~~~~
- Now papers over the API differences in versions of requests installed from
system packages versus versions of requests installed from PyPI.
- Allow string types for ``SourceAddressAdapter``.
0.4.0 -- 2015-04-03
-------------------
For more information about this release, please see `milestone 0.4.0
<https://github.com/sigmavirus24/requests-toolbelt/issues?q=milestone%3A0.4>`_
on the project's page.
New Features
~~~~~~~~~~~~
- A naive implemenation of a thread pool is now included in the toolbelt. See
the docs in ``docs/threading.rst`` or on `Read The Docs
<https://toolbelt.readthedocs.org>`_.
- The ``StreamingIterator`` now accepts files (such as ``sys.stdin``) without
a specific length and will properly stream them.
- The ``MultipartEncoder`` now accepts exactly the same format of fields as
requests' ``files`` parameter does. In other words, you can now also pass in
extra headers to add to a part in the body. You can also now specify a
custom ``Content-Type`` for a part.
- An implementation of HTTP Digest Authentication for Proxies is now included.
- A transport adapter that allows a user to specify a specific Certificate
Fingerprint is now included in the toolbelt.
- A transport adapter that simplifies how users specify socket options is now
included.
- A transport adapter that simplifies how users can specify TCP Keep-Alive
options is now included in the toolbelt.
- Deprecated functions from ``requests.utils`` are now included and
maintained.
- An authentication tool that allows users to specify how to authenticate to
several different domains at once is now included.
- A function to save streamed responses to disk by analyzing the
``Content-Disposition`` header is now included in the toolbelt.
Fixed Bugs
~~~~~~~~~~
- The ``MultipartEncoder`` will now allow users to upload files larger than
4GB on 32-bit systems.
- The ``MultipartEncoder`` will now accept empty unicode strings for form
values.
0.3.1 -- 2014-06-23
-------------------
- Fix the fact that 0.3.0 bundle did not include the ``StreamingIterator``
0.3.0 -- 2014-05-21
-------------------
Bug Fixes
~~~~~~~~~
- Complete rewrite of ``MultipartEncoder`` fixes bug where bytes were lost in
uploads
New Features
~~~~~~~~~~~~
- ``MultipartDecoder`` to accept ``multipart/form-data`` response bodies and
parse them into an easy to use object.
- ``SourceAddressAdapter`` to allow users to choose a local address to bind
connections to.
- ``GuessAuth`` which accepts a username and password and uses the
``WWW-Authenticate`` header to determine how to authenticate against a
server.
- ``MultipartEncoderMonitor`` wraps an instance of the ``MultipartEncoder``
and keeps track of how many bytes were read and will call the provided
callback.
- ``StreamingIterator`` will wrap an iterator and stream the upload instead of
chunk it, provided you also provide the length of the content you wish to
upload.
0.2.0 -- 2014-02-24
-------------------
- Add ability to tell ``MultipartEncoder`` which encoding to use. By default
it uses 'utf-8'.
- Fix #10 - allow users to install with pip
- Fix #9 - Fix ``MultipartEncoder#to_string`` so that it properly handles file
objects as fields
0.1.2 -- 2014-01-19
-------------------
- At some point during development we broke how we handle normal file objects.
Thanks to @konomae this is now fixed.
0.1.1 -- 2014-01-19
-------------------
- Handle ``io.BytesIO``-like objects better
0.1.0 -- 2014-01-18
-------------------
- Add initial implementation of the streaming ``MultipartEncoder``
- Add initial implementation of the ``user_agent`` function
- Add the ``SSLAdapter``
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: Implementation :: CPython

View File

@ -0,0 +1,99 @@
AUTHORS.rst
CODE_OF_CONDUCT.rst
HISTORY.rst
LICENSE
MANIFEST.in
README.rst
dev-requirements.txt
setup.cfg
setup.py
tox.ini
docs/Makefile
docs/adapters.rst
docs/authentication.rst
docs/conf.py
docs/contributing.rst
docs/deprecated.rst
docs/downloadutils.rst
docs/dumputils.rst
docs/exceptions.rst
docs/formdata.rst
docs/index.rst
docs/make.bat
docs/sessions.rst
docs/threading.rst
docs/uploading-data.rst
docs/user-agent.rst
docs/user.rst
requests_toolbelt/__init__.py
requests_toolbelt/_compat.py
requests_toolbelt/exceptions.py
requests_toolbelt/sessions.py
requests_toolbelt/streaming_iterator.py
requests_toolbelt.egg-info/PKG-INFO
requests_toolbelt.egg-info/SOURCES.txt
requests_toolbelt.egg-info/dependency_links.txt
requests_toolbelt.egg-info/requires.txt
requests_toolbelt.egg-info/top_level.txt
requests_toolbelt/adapters/__init__.py
requests_toolbelt/adapters/appengine.py
requests_toolbelt/adapters/fingerprint.py
requests_toolbelt/adapters/host_header_ssl.py
requests_toolbelt/adapters/socket_options.py
requests_toolbelt/adapters/source.py
requests_toolbelt/adapters/ssl.py
requests_toolbelt/auth/__init__.py
requests_toolbelt/auth/_digest_auth_compat.py
requests_toolbelt/auth/guess.py
requests_toolbelt/auth/handler.py
requests_toolbelt/auth/http_proxy_digest.py
requests_toolbelt/cookies/__init__.py
requests_toolbelt/cookies/forgetful.py
requests_toolbelt/downloadutils/__init__.py
requests_toolbelt/downloadutils/stream.py
requests_toolbelt/downloadutils/tee.py
requests_toolbelt/multipart/__init__.py
requests_toolbelt/multipart/decoder.py
requests_toolbelt/multipart/encoder.py
requests_toolbelt/threaded/__init__.py
requests_toolbelt/threaded/pool.py
requests_toolbelt/threaded/thread.py
requests_toolbelt/utils/__init__.py
requests_toolbelt/utils/deprecated.py
requests_toolbelt/utils/dump.py
requests_toolbelt/utils/formdata.py
requests_toolbelt/utils/user_agent.py
tests/__init__.py
tests/conftest.py
tests/test_appengine_adapter.py
tests/test_auth.py
tests/test_auth_handler.py
tests/test_downloadutils.py
tests/test_dump.py
tests/test_fingerprintadapter.py
tests/test_forgetfulcookiejar.py
tests/test_formdata.py
tests/test_host_header_ssl_adapter.py
tests/test_multipart_decoder.py
tests/test_multipart_encoder.py
tests/test_multipart_monitor.py
tests/test_proxy_digest_auth.py
tests/test_sessions.py
tests/test_socket_options_adapter.py
tests/test_source_adapter.py
tests/test_ssladapter.py
tests/test_streaming_iterator.py
tests/test_user_agent.py
tests/cassettes/http2bin_cookies.json
tests/cassettes/http2bin_fingerprint.json
tests/cassettes/httpbin_guess_auth_basic.json
tests/cassettes/httpbin_guess_auth_digest.json
tests/cassettes/httpbin_guess_auth_none.json
tests/cassettes/klevas_vu_lt_ssl3.json
tests/cassettes/redirect_request_for_dump_all.json
tests/cassettes/simple_get_request.json
tests/cassettes/stream_response_to_file.json
tests/threaded/__init__.py
tests/threaded/test_api.py
tests/threaded/test_pool.py
tests/threaded/test_thread.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@
requests>=2.0.1,<3.0.0

View File

@ -0,0 +1 @@
requests_toolbelt

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""
requests-toolbelt
=================
See http://toolbelt.rtfd.org/ for documentation
:copyright: (c) 2014 by Ian Cordasco and Cory Benfield
:license: Apache v2.0, see LICENSE for more details
"""
from .adapters import SSLAdapter, SourceAddressAdapter
from .auth.guess import GuessAuth
from .multipart import (
MultipartEncoder, MultipartEncoderMonitor, MultipartDecoder,
ImproperBodyPartContentException, NonMultipartContentTypeException
)
from .streaming_iterator import StreamingIterator
from .utils.user_agent import user_agent
__title__ = 'requests-toolbelt'
__authors__ = 'Ian Cordasco, Cory Benfield'
__license__ = 'Apache v2.0'
__copyright__ = 'Copyright 2014 Ian Cordasco, Cory Benfield'
__version__ = '0.8.0'
__version_info__ = tuple(int(i) for i in __version__.split('.'))
__all__ = [
'GuessAuth', 'MultipartEncoder', 'MultipartEncoderMonitor',
'MultipartDecoder', 'SSLAdapter', 'SourceAddressAdapter',
'StreamingIterator', 'user_agent', 'ImproperBodyPartContentException',
'NonMultipartContentTypeException', '__title__', '__authors__',
'__license__', '__copyright__', '__version__', '__version_info__',
]

View File

@ -0,0 +1,310 @@
"""Private module full of compatibility hacks.
Primarily this is for downstream redistributions of requests that unvendor
urllib3 without providing a shim.
.. warning::
This module is private. If you use it, and something breaks, you were
warned
"""
from collections import Mapping, MutableMapping
import sys
import requests
try:
from requests.packages.urllib3 import fields
from requests.packages.urllib3 import filepost
from requests.packages.urllib3 import poolmanager
except ImportError:
from urllib3 import fields
from urllib3 import filepost
from urllib3 import poolmanager
try:
from requests.packages.urllib3.connection import HTTPConnection
from requests.packages.urllib3 import connection
except ImportError:
try:
from urllib3.connection import HTTPConnection
from urllib3 import connection
except ImportError:
HTTPConnection = None
connection = None
if requests.__build__ < 0x020300:
timeout = None
else:
try:
from requests.packages.urllib3.util import timeout
except ImportError:
from urllib3.util import timeout
if requests.__build__ < 0x021000:
gaecontrib = None
else:
try:
from requests.packages.urllib3.contrib import appengine as gaecontrib
except ImportError:
from urllib3.contrib import appengine as gaecontrib
PY3 = sys.version_info > (3, 0)
if PY3:
import queue
from urllib.parse import urlencode, urljoin
else:
import Queue as queue
from urllib import urlencode
from urlparse import urljoin
try:
basestring = basestring
except NameError:
basestring = (str, bytes)
class HTTPHeaderDict(MutableMapping):
"""
:param headers:
An iterable of field-value pairs. Must not contain multiple field names
when compared case-insensitively.
:param kwargs:
Additional field-value pairs to pass in to ``dict.update``.
A ``dict`` like container for storing HTTP Headers.
Field names are stored and compared case-insensitively in compliance with
RFC 7230. Iteration provides the first case-sensitive key seen for each
case-insensitive pair.
Using ``__setitem__`` syntax overwrites fields that compare equal
case-insensitively in order to maintain ``dict``'s api. For fields that
compare equal, instead create a new ``HTTPHeaderDict`` and use ``.add``
in a loop.
If multiple fields that are equal case-insensitively are passed to the
constructor or ``.update``, the behavior is undefined and some will be
lost.
>>> headers = HTTPHeaderDict()
>>> headers.add('Set-Cookie', 'foo=bar')
>>> headers.add('set-cookie', 'baz=quxx')
>>> headers['content-length'] = '7'
>>> headers['SET-cookie']
'foo=bar, baz=quxx'
>>> headers['Content-Length']
'7'
"""
def __init__(self, headers=None, **kwargs):
super(HTTPHeaderDict, self).__init__()
self._container = {}
if headers is not None:
if isinstance(headers, HTTPHeaderDict):
self._copy_from(headers)
else:
self.extend(headers)
if kwargs:
self.extend(kwargs)
def __setitem__(self, key, val):
self._container[key.lower()] = (key, val)
return self._container[key.lower()]
def __getitem__(self, key):
val = self._container[key.lower()]
return ', '.join(val[1:])
def __delitem__(self, key):
del self._container[key.lower()]
def __contains__(self, key):
return key.lower() in self._container
def __eq__(self, other):
if not isinstance(other, Mapping) and not hasattr(other, 'keys'):
return False
if not isinstance(other, type(self)):
other = type(self)(other)
return (dict((k.lower(), v) for k, v in self.itermerged()) ==
dict((k.lower(), v) for k, v in other.itermerged()))
def __ne__(self, other):
return not self.__eq__(other)
if not PY3: # Python 2
iterkeys = MutableMapping.iterkeys
itervalues = MutableMapping.itervalues
__marker = object()
def __len__(self):
return len(self._container)
def __iter__(self):
# Only provide the originally cased names
for vals in self._container.values():
yield vals[0]
def pop(self, key, default=__marker):
"""D.pop(k[,d]) -> v, remove specified key and return its value.
If key is not found, d is returned if given, otherwise KeyError is
raised.
"""
# Using the MutableMapping function directly fails due to the private
# marker.
# Using ordinary dict.pop would expose the internal structures.
# So let's reinvent the wheel.
try:
value = self[key]
except KeyError:
if default is self.__marker:
raise
return default
else:
del self[key]
return value
def discard(self, key):
try:
del self[key]
except KeyError:
pass
def add(self, key, val):
"""Adds a (name, value) pair, doesn't overwrite the value if it already
exists.
>>> headers = HTTPHeaderDict(foo='bar')
>>> headers.add('Foo', 'baz')
>>> headers['foo']
'bar, baz'
"""
key_lower = key.lower()
new_vals = key, val
# Keep the common case aka no item present as fast as possible
vals = self._container.setdefault(key_lower, new_vals)
if new_vals is not vals:
# new_vals was not inserted, as there was a previous one
if isinstance(vals, list):
# If already several items got inserted, we have a list
vals.append(val)
else:
# vals should be a tuple then, i.e. only one item so far
# Need to convert the tuple to list for further extension
self._container[key_lower] = [vals[0], vals[1], val]
def extend(self, *args, **kwargs):
"""Generic import function for any type of header-like object.
Adapted version of MutableMapping.update in order to insert items
with self.add instead of self.__setitem__
"""
if len(args) > 1:
raise TypeError("extend() takes at most 1 positional "
"arguments ({} given)".format(len(args)))
other = args[0] if len(args) >= 1 else ()
if isinstance(other, HTTPHeaderDict):
for key, val in other.iteritems():
self.add(key, val)
elif isinstance(other, Mapping):
for key in other:
self.add(key, other[key])
elif hasattr(other, "keys"):
for key in other.keys():
self.add(key, other[key])
else:
for key, value in other:
self.add(key, value)
for key, value in kwargs.items():
self.add(key, value)
def getlist(self, key):
"""Returns a list of all the values for the named field. Returns an
empty list if the key doesn't exist."""
try:
vals = self._container[key.lower()]
except KeyError:
return []
else:
if isinstance(vals, tuple):
return [vals[1]]
else:
return vals[1:]
# Backwards compatibility for httplib
getheaders = getlist
getallmatchingheaders = getlist
iget = getlist
def __repr__(self):
return "%s(%s)" % (type(self).__name__, dict(self.itermerged()))
def _copy_from(self, other):
for key in other:
val = other.getlist(key)
if isinstance(val, list):
# Don't need to convert tuples
val = list(val)
self._container[key.lower()] = [key] + val
def copy(self):
clone = type(self)()
clone._copy_from(self)
return clone
def iteritems(self):
"""Iterate over all header lines, including duplicate ones."""
for key in self:
vals = self._container[key.lower()]
for val in vals[1:]:
yield vals[0], val
def itermerged(self):
"""Iterate over all headers, merging duplicate ones together."""
for key in self:
val = self._container[key.lower()]
yield val[0], ', '.join(val[1:])
def items(self):
return list(self.iteritems())
@classmethod
def from_httplib(cls, message): # Python 2
"""Read headers from a Python 2 httplib message object."""
# python2.7 does not expose a proper API for exporting multiheaders
# efficiently. This function re-reads raw lines from the message
# object and extracts the multiheaders properly.
headers = []
for line in message.headers:
if line.startswith((' ', '\t')):
key, value = headers[-1]
headers[-1] = (key, value + '\r\n' + line.rstrip())
continue
key, value = line.split(':', 1)
headers.append((key, value.strip()))
return cls(headers)
__all__ = (
'basestring',
'connection',
'fields',
'filepost',
'poolmanager',
'timeout',
'HTTPHeaderDict',
'queue',
'urlencode',
'gaecontrib',
'urljoin',
)

View File

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
"""
requests-toolbelt.adapters
==========================
See http://toolbelt.rtfd.org/ for documentation
:copyright: (c) 2014 by Ian Cordasco and Cory Benfield
:license: Apache v2.0, see LICENSE for more details
"""
from .ssl import SSLAdapter
from .source import SourceAddressAdapter
__all__ = ['SSLAdapter', 'SourceAddressAdapter']

View File

@ -0,0 +1,206 @@
# -*- coding: utf-8 -*-
"""The App Engine Transport Adapter for requests.
.. versionadded:: 0.6.0
This requires a version of requests >= 2.10.0 and Python 2.
There are two ways to use this library:
#. If you're using requests directly, you can use code like:
.. code-block:: python
>>> import requests
>>> import ssl
>>> import requests.packages.urllib3.contrib.appengine as ul_appengine
>>> from requests_toolbelt.adapters import appengine
>>> s = requests.Session()
>>> if ul_appengine.is_appengine_sandbox():
... s.mount('http://', appengine.AppEngineAdapter())
... s.mount('https://', appengine.AppEngineAdapter())
#. If you depend on external libraries which use requests, you can use code
like:
.. code-block:: python
>>> from requests_toolbelt.adapters import appengine
>>> appengine.monkeypatch()
which will ensure all requests.Session objects use AppEngineAdapter properly.
You are also able to :ref:`disable certificate validation <insecure_appengine>`
when monkey-patching.
"""
import requests
import warnings
from requests import adapters
from requests import sessions
from .. import exceptions as exc
from .._compat import gaecontrib
from .._compat import timeout
class AppEngineMROHack(adapters.HTTPAdapter):
"""Resolves infinite recursion when monkeypatching.
This works by injecting itself as the base class of both the
:class:`AppEngineAdapter` and Requests' default HTTPAdapter, which needs to
be done because default HTTPAdapter's MRO is recompiled when we
monkeypatch, at which point this class becomes HTTPAdapter's base class.
In addition, we use an instantiation flag to avoid infinite recursion.
"""
_initialized = False
def __init__(self, *args, **kwargs):
if not self._initialized:
self._initialized = True
super(AppEngineMROHack, self).__init__(*args, **kwargs)
class AppEngineAdapter(AppEngineMROHack, adapters.HTTPAdapter):
"""The transport adapter for Requests to use urllib3's GAE support.
Implements Requests's HTTPAdapter API.
When deploying to Google's App Engine service, some of Requests'
functionality is broken. There is underlying support for GAE in urllib3.
This functionality, however, is opt-in and needs to be enabled explicitly
for Requests to be able to use it.
"""
__attrs__ = adapters.HTTPAdapter.__attrs__ + ['_validate_certificate']
def __init__(self, validate_certificate=True, *args, **kwargs):
_check_version()
self._validate_certificate = validate_certificate
super(AppEngineAdapter, self).__init__(*args, **kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = _AppEnginePoolManager(self._validate_certificate)
class InsecureAppEngineAdapter(AppEngineAdapter):
"""An always-insecure GAE adapter for Requests.
This is a variant of the the transport adapter for Requests to use
urllib3's GAE support that does not validate certificates. Use with
caution!
.. note::
The ``validate_certificate`` keyword argument will not be honored here
and is not part of the signature because we always force it to
``False``.
See :class:`AppEngineAdapter` for further details.
"""
def __init__(self, *args, **kwargs):
if kwargs.pop("validate_certificate", False):
warnings.warn("Certificate validation cannot be specified on the "
"InsecureAppEngineAdapter, but was present. This "
"will be ignored and certificate validation will "
"remain off.", exc.IgnoringGAECertificateValidation)
super(InsecureAppEngineAdapter, self).__init__(
validate_certificate=False, *args, **kwargs)
class _AppEnginePoolManager(object):
"""Implements urllib3's PoolManager API expected by requests.
While a real PoolManager map hostnames to reusable Connections,
AppEngine has no concept of a reusable connection to a host.
So instead, this class constructs a small Connection per request,
that is returned to the Adapter and used to access the URL.
"""
def __init__(self, validate_certificate=True):
self.appengine_manager = gaecontrib.AppEngineManager(
validate_certificate=validate_certificate)
def connection_from_url(self, url):
return _AppEngineConnection(self.appengine_manager, url)
def clear(self):
pass
class _AppEngineConnection(object):
"""Implements urllib3's HTTPConnectionPool API's urlopen().
This Connection's urlopen() is called with a host-relative path,
so in order to properly support opening the URL, we need to store
the full URL when this Connection is constructed from the PoolManager.
This code wraps AppEngineManager.urlopen(), which exposes a different
API than in the original urllib3 urlopen(), and thus needs this adapter.
"""
def __init__(self, appengine_manager, url):
self.appengine_manager = appengine_manager
self.url = url
def urlopen(self, method, url, body=None, headers=None, retries=None,
redirect=True, assert_same_host=True,
timeout=timeout.Timeout.DEFAULT_TIMEOUT,
pool_timeout=None, release_conn=None, **response_kw):
# This function's url argument is a host-relative URL,
# but the AppEngineManager expects an absolute URL.
# So we saved out the self.url when the AppEngineConnection
# was constructed, which we then can use down below instead.
# We once tried to verify our assumptions here, but sometimes the
# passed-in URL differs on url fragments, or "http://a.com" vs "/".
# urllib3's App Engine adapter only uses Timeout.total, not read or
# connect.
if not timeout.total:
timeout.total = timeout._read or timeout._connect
# Jump through the hoops necessary to call AppEngineManager's API.
return self.appengine_manager.urlopen(
method,
self.url,
body=body,
headers=headers,
retries=retries,
redirect=redirect,
timeout=timeout,
**response_kw)
def monkeypatch(validate_certificate=True):
"""Sets up all Sessions to use AppEngineAdapter by default.
If you don't want to deal with configuring your own Sessions,
or if you use libraries that use requests directly (ie requests.post),
then you may prefer to monkeypatch and auto-configure all Sessions.
.. warning: :
If ``validate_certificate`` is ``False``, certification validation will
effectively be disabled for all requests.
"""
_check_version()
# HACK: We should consider modifying urllib3 to support this cleanly,
# so that we can set a module-level variable in the sessions module,
# instead of overriding an imported HTTPAdapter as is done here.
adapter = AppEngineAdapter
if not validate_certificate:
adapter = InsecureAppEngineAdapter
sessions.HTTPAdapter = adapter
adapters.HTTPAdapter = adapter
def _check_version():
if gaecontrib is None:
raise exc.VersionMismatchError(
"The toolbelt requires at least Requests 2.10.0 to be "
"installed. Version {0} was found instead.".format(
requests.__version__
)
)

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""Submodule containing the implementation for the FingerprintAdapter.
This file contains an implementation of a Transport Adapter that validates
the fingerprints of SSL certificates presented upon connection.
"""
from requests.adapters import HTTPAdapter
from .._compat import poolmanager
class FingerprintAdapter(HTTPAdapter):
"""
A HTTPS Adapter for Python Requests that verifies certificate fingerprints,
instead of certificate hostnames.
Example usage:
.. code-block:: python
import requests
import ssl
from requests_toolbelt.adapters.fingerprint import FingerprintAdapter
twitter_fingerprint = '...'
s = requests.Session()
s.mount(
'https://twitter.com',
FingerprintAdapter(twitter_fingerprint)
)
The fingerprint should be provided as a hexadecimal string, optionally
containing colons.
"""
__attrs__ = HTTPAdapter.__attrs__ + ['fingerprint']
def __init__(self, fingerprint, **kwargs):
self.fingerprint = fingerprint
super(FingerprintAdapter, self).__init__(**kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = poolmanager.PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
assert_fingerprint=self.fingerprint)

View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.adapters.host_header_ssl
==========================================
This file contains an implementation of the HostHeaderSSLAdapter.
"""
from requests.adapters import HTTPAdapter
class HostHeaderSSLAdapter(HTTPAdapter):
"""
A HTTPS Adapter for Python Requests that sets the hostname for certificate
verification based on the Host header.
This allows requesting the IP address directly via HTTPS without getting
a "hostname doesn't match" exception.
Example usage:
>>> s.mount('https://', HostHeaderSSLAdapter())
>>> s.get("https://93.184.216.34", headers={"Host": "example.org"})
"""
def send(self, request, **kwargs):
# HTTP headers are case-insensitive (RFC 7230)
host_header = None
for header in request.headers:
if header.lower() == "host":
host_header = request.headers[header]
break
connection_pool_kwargs = self.poolmanager.connection_pool_kw
if host_header:
connection_pool_kwargs["assert_hostname"] = host_header
elif "assert_hostname" in connection_pool_kwargs:
# an assert_hostname from a previous request may have been left
connection_pool_kwargs.pop("assert_hostname", None)
return super(HostHeaderSSLAdapter, self).send(request, **kwargs)

View File

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
"""The implementation of the SocketOptionsAdapter."""
import socket
import warnings
import requests
from requests import adapters
from .._compat import connection
from .._compat import poolmanager
from .. import exceptions as exc
class SocketOptionsAdapter(adapters.HTTPAdapter):
"""An adapter for requests that allows users to specify socket options.
Since version 2.4.0 of requests, it is possible to specify a custom list
of socket options that need to be set before establishing the connection.
Example usage::
>>> import socket
>>> import requests
>>> from requests_toolbelt.adapters import socket_options
>>> s = requests.Session()
>>> opts = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)]
>>> adapter = socket_options.SocketOptionsAdapter(socket_options=opts)
>>> s.mount('http://', adapter)
You can also take advantage of the list of default options on this class
to keep using the original options in addition to your custom options. In
that case, ``opts`` might look like::
>>> opts = socket_options.SocketOptionsAdapter.default_options + opts
"""
if connection is not None:
default_options = getattr(
connection.HTTPConnection,
'default_socket_options',
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
)
else:
default_options = []
warnings.warn(exc.RequestsVersionTooOld,
"This version of Requests is only compatible with a "
"version of urllib3 which is too old to support "
"setting options on a socket. This adapter is "
"functionally useless.")
def __init__(self, **kwargs):
self.socket_options = kwargs.pop('socket_options',
self.default_options)
super(SocketOptionsAdapter, self).__init__(**kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
if requests.__build__ >= 0x020400:
# NOTE(Ian): Perhaps we should raise a warning
self.poolmanager = poolmanager.PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
socket_options=self.socket_options
)
else:
super(SocketOptionsAdapter, self).init_poolmanager(
connections, maxsize, block
)
class TCPKeepAliveAdapter(SocketOptionsAdapter):
"""An adapter for requests that turns on TCP Keep-Alive by default.
The adapter sets 4 socket options:
- ``SOL_SOCKET`` ``SO_KEEPALIVE`` - This turns on TCP Keep-Alive
- ``IPPROTO_TCP`` ``TCP_KEEPINTVL`` 20 - Sets the keep alive interval
- ``IPPROTO_TCP`` ``TCP_KEEPCNT`` 5 - Sets the number of keep alive probes
- ``IPPROTO_TCP`` ``TCP_KEEPIDLE`` 60 - Sets the keep alive time if the
socket library has the ``TCP_KEEPIDLE`` constant
The latter three can be overridden by keyword arguments (respectively):
- ``idle``
- ``interval``
- ``count``
You can use this adapter like so::
>>> from requests_toolbelt.adapters import socket_options
>>> tcp = socket_options.TCPKeepAliveAdapter(idle=120, interval=10)
>>> s = requests.Session()
>>> s.mount('http://', tcp)
"""
def __init__(self, **kwargs):
socket_options = kwargs.pop('socket_options',
SocketOptionsAdapter.default_options)
idle = kwargs.pop('idle', 60)
interval = kwargs.pop('interval', 20)
count = kwargs.pop('count', 5)
socket_options = socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count),
]
# NOTE(Ian): Apparently OSX does not have this constant defined, so we
# set it conditionally.
if getattr(socket, 'TCP_KEEPIDLE', None) is not None:
socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)]
super(TCPKeepAliveAdapter, self).__init__(
socket_options=socket_options, **kwargs
)

View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.source_adapter
================================
This file contains an implementation of the SourceAddressAdapter originally
demonstrated on the Requests GitHub page.
"""
from requests.adapters import HTTPAdapter
from .._compat import poolmanager, basestring
class SourceAddressAdapter(HTTPAdapter):
"""
A Source Address Adapter for Python Requests that enables you to choose the
local address to bind to. This allows you to send your HTTP requests from a
specific interface and IP address.
Two address formats are accepted. The first is a string: this will set the
local IP address to the address given in the string, and will also choose a
semi-random high port for the local port number.
The second is a two-tuple of the form (ip address, port): for example,
``('10.10.10.10', 8999)``. This will set the local IP address to the first
element, and the local port to the second element. If ``0`` is used as the
port number, a semi-random high port will be selected.
.. warning:: Setting an explicit local port can have negative interactions
with connection-pooling in Requests: in particular, it risks
the possibility of getting "Address in use" errors. The
string-only argument is generally preferred to the tuple-form.
Example usage:
.. code-block:: python
import requests
from requests_toolbelt.adapters.source import SourceAddressAdapter
s = requests.Session()
s.mount('http://', SourceAddressAdapter('10.10.10.10'))
s.mount('https://', SourceAddressAdapter(('10.10.10.10', 8999))
"""
def __init__(self, source_address, **kwargs):
if isinstance(source_address, basestring):
self.source_address = (source_address, 0)
elif isinstance(source_address, tuple):
self.source_address = source_address
else:
raise TypeError(
"source_address must be IP address string or (ip, port) tuple"
)
super(SourceAddressAdapter, self).__init__(**kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = poolmanager.PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
source_address=self.source_address)
def proxy_manager_for(self, *args, **kwargs):
kwargs['source_address'] = self.source_address
return super(SourceAddressAdapter, self).proxy_manager_for(
*args, **kwargs)

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.ssl_adapter
=============================
This file contains an implementation of the SSLAdapter originally demonstrated
in this blog post:
https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/
"""
import requests
from requests.adapters import HTTPAdapter
from .._compat import poolmanager
class SSLAdapter(HTTPAdapter):
"""
A HTTPS Adapter for Python Requests that allows the choice of the SSL/TLS
version negotiated by Requests. This can be used either to enforce the
choice of high-security TLS versions (where supported), or to work around
misbehaving servers that fail to correctly negotiate the default TLS
version being offered.
Example usage:
>>> import requests
>>> import ssl
>>> from requests_toolbelt import SSLAdapter
>>> s = requests.Session()
>>> s.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1))
You can replace the chosen protocol with any that are available in the
default Python SSL module. All subsequent requests that match the adapter
prefix will use the chosen SSL version instead of the default.
This adapter will also attempt to change the SSL/TLS version negotiated by
Requests when using a proxy. However, this may not always be possible:
prior to Requests v2.4.0 the adapter did not have access to the proxy setup
code. In earlier versions of Requests, this adapter will not function
properly when used with proxies.
"""
__attrs__ = HTTPAdapter.__attrs__ + ['ssl_version']
def __init__(self, ssl_version=None, **kwargs):
self.ssl_version = ssl_version
super(SSLAdapter, self).__init__(**kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = poolmanager.PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
ssl_version=self.ssl_version)
if requests.__build__ >= 0x020400:
# Earlier versions of requests either don't have this method or, worse,
# don't allow passing arbitrary keyword arguments. As a result, only
# conditionally define this method.
def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_version'] = self.ssl_version
return super(SSLAdapter, self).proxy_manager_for(*args, **kwargs)

View File

View File

@ -0,0 +1,29 @@
"""Provide a compatibility layer for requests.auth.HTTPDigestAuth."""
import requests
class _ThreadingDescriptor(object):
def __init__(self, prop, default):
self.prop = prop
self.default = default
def __get__(self, obj, objtype=None):
return getattr(obj._thread_local, self.prop, self.default)
def __set__(self, obj, value):
setattr(obj._thread_local, self.prop, value)
class _HTTPDigestAuth(requests.auth.HTTPDigestAuth):
init = _ThreadingDescriptor('init', True)
last_nonce = _ThreadingDescriptor('last_nonce', '')
nonce_count = _ThreadingDescriptor('nonce_count', 0)
chal = _ThreadingDescriptor('chal', {})
pos = _ThreadingDescriptor('pos', None)
num_401_calls = _ThreadingDescriptor('num_401_calls', 1)
if requests.__build__ < 0x020800:
HTTPDigestAuth = requests.auth.HTTPDigestAuth
else:
HTTPDigestAuth = _HTTPDigestAuth

View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
"""The module containing the code for GuessAuth."""
from requests import auth
from requests import cookies
from . import _digest_auth_compat as auth_compat, http_proxy_digest
class GuessAuth(auth.AuthBase):
"""Guesses the auth type by the WWW-Authentication header."""
def __init__(self, username, password):
self.username = username
self.password = password
self.auth = None
self.pos = None
def _handle_basic_auth_401(self, r, kwargs):
if self.pos is not None:
r.request.body.seek(self.pos)
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.raw.release_conn()
prep = r.request.copy()
if not hasattr(prep, '_cookies'):
prep._cookies = cookies.RequestsCookieJar()
cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
self.auth = auth.HTTPBasicAuth(self.username, self.password)
prep = self.auth(prep)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
def _handle_digest_auth_401(self, r, kwargs):
self.auth = auth_compat.HTTPDigestAuth(self.username, self.password)
try:
self.auth.init_per_thread_state()
except AttributeError:
# If we're not on requests 2.8.0+ this method does not exist and
# is not relevant.
pass
# Check that the attr exists because much older versions of requests
# set this attribute lazily. For example:
# https://github.com/kennethreitz/requests/blob/33735480f77891754304e7f13e3cdf83aaaa76aa/requests/auth.py#L59
if (hasattr(self.auth, 'num_401_calls') and
self.auth.num_401_calls is None):
self.auth.num_401_calls = 1
# Digest auth would resend the request by itself. We can take a
# shortcut here.
return self.auth.handle_401(r, **kwargs)
def handle_401(self, r, **kwargs):
"""Resends a request with auth headers, if needed."""
www_authenticate = r.headers.get('www-authenticate', '').lower()
if 'basic' in www_authenticate:
return self._handle_basic_auth_401(r, kwargs)
if 'digest' in www_authenticate:
return self._handle_digest_auth_401(r, kwargs)
def __call__(self, request):
if self.auth is not None:
return self.auth(request)
try:
self.pos = request.body.tell()
except AttributeError:
pass
request.register_hook('response', self.handle_401)
return request
class GuessProxyAuth(GuessAuth):
"""
Guesses the auth type by WWW-Authentication and Proxy-Authentication
headers
"""
def __init__(self, username=None, password=None,
proxy_username=None, proxy_password=None):
super(GuessProxyAuth, self).__init__(username, password)
self.proxy_username = proxy_username
self.proxy_password = proxy_password
self.proxy_auth = None
def _handle_basic_auth_407(self, r, kwargs):
if self.pos is not None:
r.request.body.seek(self.pos)
r.content
r.raw.release_conn()
prep = r.request.copy()
if not hasattr(prep, '_cookies'):
prep._cookies = cookies.RequestsCookieJar()
cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
self.proxy_auth = auth.HTTPProxyAuth(self.proxy_username,
self.proxy_password)
prep = self.proxy_auth(prep)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
def _handle_digest_auth_407(self, r, kwargs):
self.proxy_auth = http_proxy_digest.HTTPProxyDigestAuth(
username=self.proxy_username,
password=self.proxy_password)
try:
self.auth.init_per_thread_state()
except AttributeError:
pass
return self.proxy_auth.handle_407(r, **kwargs)
def handle_407(self, r, **kwargs):
proxy_authenticate = r.headers.get('Proxy-Authenticate', '').lower()
if 'basic' in proxy_authenticate:
return self._handle_basic_auth_407(r, kwargs)
if 'digest' in proxy_authenticate:
return self._handle_digest_auth_407(r, kwargs)
def __call__(self, request):
if self.proxy_auth is not None:
request = self.proxy_auth(request)
try:
self.pos = request.body.tell()
except AttributeError:
pass
request.register_hook('response', self.handle_407)
return super(GuessProxyAuth, self).__call__(request)

View File

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.auth.handler
==============================
This holds all of the implementation details of the Authentication Handler.
"""
from requests.auth import AuthBase, HTTPBasicAuth
from requests.compat import urlparse, urlunparse
class AuthHandler(AuthBase):
"""
The ``AuthHandler`` object takes a dictionary of domains paired with
authentication strategies and will use this to determine which credentials
to use when making a request. For example, you could do the following:
.. code-block:: python
from requests import HTTPDigestAuth
from requests_toolbelt.auth.handler import AuthHandler
import requests
auth = AuthHandler({
'https://api.github.com': ('sigmavirus24', 'fakepassword'),
'https://example.com': HTTPDigestAuth('username', 'password')
})
r = requests.get('https://api.github.com/user', auth=auth)
# => <Response [200]>
r = requests.get('https://example.com/some/path', auth=auth)
# => <Response [200]>
s = requests.Session()
s.auth = auth
r = s.get('https://api.github.com/user')
# => <Response [200]>
.. warning::
:class:`requests.auth.HTTPDigestAuth` is not yet thread-safe. If you
use :class:`AuthHandler` across multiple threads you should
instantiate a new AuthHandler for each thread with a new
HTTPDigestAuth instance for each thread.
"""
def __init__(self, strategies):
self.strategies = dict(strategies)
self._make_uniform()
def __call__(self, request):
auth = self.get_strategy_for(request.url)
return auth(request)
def __repr__(self):
return '<AuthHandler({0!r})>'.format(self.strategies)
def _make_uniform(self):
existing_strategies = list(self.strategies.items())
self.strategies = {}
for (k, v) in existing_strategies:
self.add_strategy(k, v)
@staticmethod
def _key_from_url(url):
parsed = urlparse(url)
return urlunparse((parsed.scheme.lower(),
parsed.netloc.lower(),
'', '', '', ''))
def add_strategy(self, domain, strategy):
"""Add a new domain and authentication strategy.
:param str domain: The domain you wish to match against. For example:
``'https://api.github.com'``
:param str strategy: The authentication strategy you wish to use for
that domain. For example: ``('username', 'password')`` or
``requests.HTTPDigestAuth('username', 'password')``
.. code-block:: python
a = AuthHandler({})
a.add_strategy('https://api.github.com', ('username', 'password'))
"""
# Turn tuples into Basic Authentication objects
if isinstance(strategy, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example, ``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com', ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy, requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
def remove_strategy(self, domain):
"""Remove the domain and strategy from the collection of strategies.
:param str domain: The domain you wish remove. For example,
``'https://api.github.com'``.
.. code-block:: python
a = AuthHandler({'example.com', ('foo', 'bar')})
a.remove_strategy('example.com')
assert a.strategies == {}
"""
key = self._key_from_url(domain)
if key in self.strategies:
del self.strategies[key]
class NullAuthStrategy(AuthBase):
def __repr__(self):
return '<NullAuthStrategy>'
def __call__(self, r):
return r

View File

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""The module containing HTTPProxyDigestAuth."""
import re
from requests import cookies, utils
from . import _digest_auth_compat as auth
class HTTPProxyDigestAuth(auth.HTTPDigestAuth):
"""HTTP digest authentication between proxy
:param stale_rejects: The number of rejects indicate that:
the client may wish to simply retry the request
with a new encrypted response, without reprompting the user for a
new username and password. i.e., retry build_digest_header
:type stale_rejects: int
"""
_pat = re.compile(r'digest ', flags=re.IGNORECASE)
def __init__(self, *args, **kwargs):
super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs)
self.stale_rejects = 0
self.init_per_thread_state()
@property
def stale_rejects(self):
thread_local = getattr(self, '_thread_local', None)
if thread_local is None:
return self._stale_rejects
return thread_local.stale_rejects
@stale_rejects.setter
def stale_rejects(self, value):
thread_local = getattr(self, '_thread_local', None)
if thread_local is None:
self._stale_rejects = value
else:
thread_local.stale_rejects = value
def init_per_thread_state(self):
try:
super(HTTPProxyDigestAuth, self).init_per_thread_state()
except AttributeError:
# If we're not on requests 2.8.0+ this method does not exist
pass
def handle_407(self, r, **kwargs):
"""Handle HTTP 407 only once, otherwise give up
:param r: current response
:returns: responses, along with the new response
"""
if r.status_code == 407 and self.stale_rejects < 2:
s_auth = r.headers.get("proxy-authenticate")
if s_auth is None:
raise IOError(
"proxy server violated RFC 7235:"
"407 response MUST contain header proxy-authenticate")
elif not self._pat.match(s_auth):
return r
self.chal = utils.parse_dict_header(
self._pat.sub('', s_auth, count=1))
# if we present the user/passwd and still get rejected
# http://tools.ietf.org/html/rfc2617#section-3.2.1
if ('Proxy-Authorization' in r.request.headers and
'stale' in self.chal):
if self.chal['stale'].lower() == 'true': # try again
self.stale_rejects += 1
# wrong user/passwd
elif self.chal['stale'].lower() == 'false':
raise IOError("User or password is invalid")
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.close()
prep = r.request.copy()
cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
prep.headers['Proxy-Authorization'] = self.build_digest_header(
prep.method, prep.url)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
else: # give up authenticate
return r
def __call__(self, r):
self.init_per_thread_state()
# if we have nonce, then just use it, otherwise server will tell us
if self.last_nonce:
r.headers['Proxy-Authorization'] = self.build_digest_header(
r.method, r.url
)
r.register_hook('response', self.handle_407)
return r

View File

View File

@ -0,0 +1,7 @@
"""The module containing the code for ForgetfulCookieJar."""
from requests.cookies import RequestsCookieJar
class ForgetfulCookieJar(RequestsCookieJar):
def set_cookie(self, *args, **kwargs):
return

View File

@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
"""Utilities for dealing with streamed requests."""
import collections
import os.path
import re
from .. import exceptions as exc
# Regular expressions stolen from werkzeug/http.py
# cd2c97bb0a076da2322f11adce0b2731f9193396 L62-L64
_QUOTED_STRING_RE = r'"[^"\\]*(?:\\.[^"\\]*)*"'
_OPTION_HEADER_PIECE_RE = re.compile(
r';\s*(%s|[^\s;=]+)\s*(?:=\s*(%s|[^;]+))?\s*' % (_QUOTED_STRING_RE,
_QUOTED_STRING_RE)
)
_DEFAULT_CHUNKSIZE = 512
def _get_filename(content_disposition):
for match in _OPTION_HEADER_PIECE_RE.finditer(content_disposition):
k, v = match.groups()
if k == 'filename':
# ignore any directory paths in the filename
return os.path.split(v)[1]
return None
def get_download_file_path(response, path):
"""
Given a response and a path, return a file path for a download.
If a ``path`` parameter is a directory, this function will parse the
``Content-Disposition`` header on the response to determine the name of the
file as reported by the server, and return a file path in the specified
directory.
If ``path`` is empty or None, this function will return a path relative
to the process' current working directory.
If path is a full file path, return it.
:param response: A Response object from requests
:type response: requests.models.Response
:param str path: Directory or file path.
:returns: full file path to download as
:rtype: str
:raises: :class:`requests_toolbelt.exceptions.StreamingError`
"""
path_is_dir = path and os.path.isdir(path)
if path and not path_is_dir:
# fully qualified file path
filepath = path
else:
response_filename = _get_filename(
response.headers.get('content-disposition', '')
)
if not response_filename:
raise exc.StreamingError('No filename given to stream response to')
if path_is_dir:
# directory to download to
filepath = os.path.join(path, response_filename)
else:
# fallback to downloading to current working directory
filepath = response_filename
return filepath
def stream_response_to_file(response, path=None, chunksize=_DEFAULT_CHUNKSIZE):
"""Stream a response body to the specified file.
Either use the ``path`` provided or use the name provided in the
``Content-Disposition`` header.
.. warning::
If you pass this function an open file-like object as the ``path``
parameter, the function will not close that file for you.
.. warning::
This function will not automatically close the response object
passed in as the ``response`` parameter.
If a ``path`` parameter is a directory, this function will parse the
``Content-Disposition`` header on the response to determine the name of the
file as reported by the server, and return a file path in the specified
directory. If no ``path`` parameter is supplied, this function will default
to the process' current working directory.
.. code-block:: python
import requests
from requests_toolbelt import exceptions
from requests_toolbelt.downloadutils import stream
r = requests.get(url, stream=True)
try:
filename = stream.stream_response_to_file(r)
except exceptions.StreamingError as e:
# The toolbelt could not find the filename in the
# Content-Disposition
print(e.message)
You can also specify the filename as a string. This will be passed to
the built-in :func:`open` and we will read the content into the file.
.. code-block:: python
import requests
from requests_toolbelt.downloadutils import stream
r = requests.get(url, stream=True)
filename = stream.stream_response_to_file(r, path='myfile')
If the calculated download file path already exists, this function will
raise a StreamingError.
Instead, if you want to manage the file object yourself, you need to
provide either a :class:`io.BytesIO` object or a file opened with the
`'b'` flag. See the two examples below for more details.
.. code-block:: python
import requests
from requests_toolbelt.downloadutils import stream
with open('myfile', 'wb') as fd:
r = requests.get(url, stream=True)
filename = stream.stream_response_to_file(r, path=fd)
print('{0} saved to {1}'.format(url, filename))
.. code-block:: python
import io
import requests
from requests_toolbelt.downloadutils import stream
b = io.BytesIO()
r = requests.get(url, stream=True)
filename = stream.stream_response_to_file(r, path=b)
assert filename is None
:param response: A Response object from requests
:type response: requests.models.Response
:param path: *(optional)*, Either a string with the path to the location
to save the response content, or a file-like object expecting bytes.
:type path: :class:`str`, or object with a :meth:`write`
:param int chunksize: (optional), Size of chunk to attempt to stream
(default 512B).
:returns: The name of the file, if one can be determined, else None
:rtype: str
:raises: :class:`requests_toolbelt.exceptions.StreamingError`
"""
pre_opened = False
fd = None
filename = None
if path and isinstance(getattr(path, 'write', None), collections.Callable):
pre_opened = True
fd = path
filename = getattr(fd, 'name', None)
else:
filename = get_download_file_path(response, path)
if os.path.exists(filename):
raise exc.StreamingError("File already exists: %s" % filename)
fd = open(filename, 'wb')
for chunk in response.iter_content(chunk_size=chunksize):
fd.write(chunk)
if not pre_opened:
fd.close()
return filename

View File

@ -0,0 +1,123 @@
"""Tee function implementations."""
import io
_DEFAULT_CHUNKSIZE = 65536
__all__ = ['tee', 'tee_to_file', 'tee_to_bytearray']
def _tee(response, callback, chunksize, decode_content):
for chunk in response.raw.stream(amt=chunksize,
decode_content=decode_content):
callback(chunk)
yield chunk
def tee(response, fileobject, chunksize=_DEFAULT_CHUNKSIZE,
decode_content=None):
"""Stream the response both to the generator and a file.
This will stream the response body while writing the bytes to
``fileobject``.
Example usage:
.. code-block:: python
resp = requests.get(url, stream=True)
with open('save_file', 'wb') as save_file:
for chunk in tee(resp, save_file):
# do stuff with chunk
.. code-block:: python
import io
resp = requests.get(url, stream=True)
fileobject = io.BytesIO()
for chunk in tee(resp, fileobject):
# do stuff with chunk
:param response: Response from requests.
:type response: requests.Response
:param fileobject: Writable file-like object.
:type fileobject: file, io.BytesIO
:param int chunksize: (optional), Size of chunk to attempt to stream.
:param bool decode_content: (optional), If True, this will decode the
compressed content of the response.
:raises: TypeError if the fileobject wasn't opened with the right mode
or isn't a BytesIO object.
"""
# We will be streaming the raw bytes from over the wire, so we need to
# ensure that writing to the fileobject will preserve those bytes. On
# Python3, if the user passes an io.StringIO, this will fail, so we need
# to check for BytesIO instead.
if not ('b' in getattr(fileobject, 'mode', '') or
isinstance(fileobject, io.BytesIO)):
raise TypeError('tee() will write bytes directly to this fileobject'
', it must be opened with the "b" flag if it is a file'
' or inherit from io.BytesIO.')
return _tee(response, fileobject.write, chunksize, decode_content)
def tee_to_file(response, filename, chunksize=_DEFAULT_CHUNKSIZE,
decode_content=None):
"""Stream the response both to the generator and a file.
This will open a file named ``filename`` and stream the response body
while writing the bytes to the opened file object.
Example usage:
.. code-block:: python
resp = requests.get(url, stream=True)
for chunk in tee_to_file(resp, 'save_file'):
# do stuff with chunk
:param response: Response from requests.
:type response: requests.Response
:param str filename: Name of file in which we write the response content.
:param int chunksize: (optional), Size of chunk to attempt to stream.
:param bool decode_content: (optional), If True, this will decode the
compressed content of the response.
"""
with open(filename, 'wb') as fd:
for chunk in tee(response, fd, chunksize, decode_content):
yield chunk
def tee_to_bytearray(response, bytearr, chunksize=_DEFAULT_CHUNKSIZE,
decode_content=None):
"""Stream the response both to the generator and a bytearray.
This will stream the response provided to the function, add them to the
provided :class:`bytearray` and yield them to the user.
.. note::
This uses the :meth:`bytearray.extend` by default instead of passing
the bytearray into the ``readinto`` method.
Example usage:
.. code-block:: python
b = bytearray()
resp = requests.get(url, stream=True)
for chunk in tee_to_bytearray(resp, b):
# do stuff with chunk
:param response: Response from requests.
:type response: requests.Response
:param bytearray bytearr: Array to add the streamed bytes to.
:param int chunksize: (optional), Size of chunk to attempt to stream.
:param bool decode_content: (optional), If True, this will decode the
compressed content of the response.
"""
if not isinstance(bytearr, bytearray):
raise TypeError('tee_to_bytearray() expects bytearr to be a '
'bytearray')
return _tee(response, bytearr.extend, chunksize, decode_content)

View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
"""Collection of exceptions raised by requests-toolbelt."""
class StreamingError(Exception):
"""Used in :mod:`requests_toolbelt.downloadutils.stream`."""
pass
class VersionMismatchError(Exception):
"""Used to indicate a version mismatch in the version of requests required.
The feature in use requires a newer version of Requests to function
appropriately but the version installed is not sufficient.
"""
pass
class RequestsVersionTooOld(Warning):
"""Used to indicate that the Requests version is too old.
If the version of Requests is too old to support a feature, we will issue
this warning to the user.
"""
pass
class IgnoringGAECertificateValidation(Warning):
"""Used to indicate that given GAE validation behavior will be ignored.
If the user has tried to specify certificate validation when using the
insecure AppEngine adapter, it will be ignored (certificate validation will
remain off), so we will issue this warning to the user.
In :class:`requests_toolbelt.adapters.appengine.InsecureAppEngineAdapter`.
"""
pass

View File

@ -0,0 +1,31 @@
"""
requests_toolbelt.multipart
===========================
See http://toolbelt.rtfd.org/ for documentation
:copyright: (c) 2014 by Ian Cordasco and Cory Benfield
:license: Apache v2.0, see LICENSE for more details
"""
from .encoder import MultipartEncoder, MultipartEncoderMonitor
from .decoder import MultipartDecoder
from .decoder import ImproperBodyPartContentException
from .decoder import NonMultipartContentTypeException
__title__ = 'requests-toolbelt'
__authors__ = 'Ian Cordasco, Cory Benfield'
__license__ = 'Apache v2.0'
__copyright__ = 'Copyright 2014 Ian Cordasco, Cory Benfield'
__all__ = [
'MultipartEncoder',
'MultipartEncoderMonitor',
'MultipartDecoder',
'ImproperBodyPartContentException',
'NonMultipartContentTypeException',
'__title__',
'__authors__',
'__license__',
'__copyright__',
]

View File

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.multipart.decoder
===================================
This holds all the implementation details of the MultipartDecoder
"""
import sys
import email.parser
from .encoder import encode_with
from requests.structures import CaseInsensitiveDict
def _split_on_find(content, bound):
point = content.find(bound)
return content[:point], content[point + len(bound):]
class ImproperBodyPartContentException(Exception):
pass
class NonMultipartContentTypeException(Exception):
pass
def _header_parser(string, encoding):
major = sys.version_info[0]
if major == 3:
string = string.decode(encoding)
headers = email.parser.HeaderParser().parsestr(string).items()
return (
(encode_with(k, encoding), encode_with(v, encoding))
for k, v in headers
)
class BodyPart(object):
"""
The ``BodyPart`` object is a ``Response``-like interface to an individual
subpart of a multipart response. It is expected that these will
generally be created by objects of the ``MultipartDecoder`` class.
Like ``Response``, there is a ``CaseInsensitiveDict`` object named header,
``content`` to access bytes, ``text`` to access unicode, and ``encoding``
to access the unicode codec.
"""
def __init__(self, content, encoding):
self.encoding = encoding
headers = {}
# Split into header section (if any) and the content
if b'\r\n\r\n' in content:
first, self.content = _split_on_find(content, b'\r\n\r\n')
if first != b'':
headers = _header_parser(first.lstrip(), encoding)
else:
raise ImproperBodyPartContentException(
'content does not contain CR-LF-CR-LF'
)
self.headers = CaseInsensitiveDict(headers)
@property
def text(self):
"""Content of the ``BodyPart`` in unicode."""
return self.content.decode(self.encoding)
class MultipartDecoder(object):
"""
The ``MultipartDecoder`` object parses the multipart payload of
a bytestring into a tuple of ``Response``-like ``BodyPart`` objects.
The basic usage is::
import requests
from requests_toolbelt import MultipartDecoder
response = request.get(url)
decoder = MultipartDecoder.from_response(response)
for part in decoder.parts:
print(part.header['content-type'])
If the multipart content is not from a response, basic usage is::
from requests_toolbelt import MultipartDecoder
decoder = MultipartDecoder(content, content_type)
for part in decoder.parts:
print(part.header['content-type'])
For both these usages, there is an optional ``encoding`` parameter. This is
a string, which is the name of the unicode codec to use (default is
``'utf-8'``).
"""
def __init__(self, content, content_type, encoding='utf-8'):
#: Original content
self.content = content
#: Original Content-Type header
self.content_type = content_type
#: Response body encoding
self.encoding = encoding
#: Parsed parts of the multipart response body
self.parts = tuple()
self._find_boundary()
self._parse_body()
def _find_boundary(self):
ct_info = tuple(x.strip() for x in self.content_type.split(';'))
mimetype = ct_info[0]
if mimetype.split('/')[0] != 'multipart':
raise NonMultipartContentTypeException(
"Unexpected mimetype in content-type: '{0}'".format(mimetype)
)
for item in ct_info[1:]:
attr, value = _split_on_find(
item,
'='
)
if attr.lower() == 'boundary':
self.boundary = encode_with(value.strip('"'), self.encoding)
@staticmethod
def _fix_first_part(part, boundary_marker):
bm_len = len(boundary_marker)
if boundary_marker == part[:bm_len]:
return part[bm_len:]
else:
return part
def _parse_body(self):
boundary = b''.join((b'--', self.boundary))
def body_part(part):
fixed = MultipartDecoder._fix_first_part(part, boundary)
return BodyPart(fixed, self.encoding)
def test_part(part):
return (part != b'' and
part != b'\r\n' and
part[:4] != b'--\r\n' and
part != b'--')
parts = self.content.split(b''.join((b'\r\n', boundary)))
self.parts = tuple(body_part(x) for x in parts if test_part(x))
@classmethod
def from_response(cls, response, encoding='utf-8'):
content = response.content
content_type = response.headers.get('content-type', None)
return cls(content, content_type, encoding)

View File

@ -0,0 +1,570 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.multipart.encoder
===================================
This holds all of the implementation details of the MultipartEncoder
"""
import contextlib
import io
import os
from uuid import uuid4
from .._compat import fields
class MultipartEncoder(object):
"""
The ``MultipartEncoder`` oject is a generic interface to the engine that
will create a ``multipart/form-data`` body for you.
The basic usage is:
.. code-block:: python
import requests
from requests_toolbelt import MultipartEncoder
encoder = MultipartEncoder({'field': 'value',
'other_field', 'other_value'})
r = requests.post('https://httpbin.org/post', data=encoder,
headers={'Content-Type': encoder.content_type})
If you do not need to take advantage of streaming the post body, you can
also do:
.. code-block:: python
r = requests.post('https://httpbin.org/post',
data=encoder.to_string(),
headers={'Content-Type': encoder.content_type})
If you want the encoder to use a specific order, you can use an
OrderedDict or more simply, a list of tuples:
.. code-block:: python
encoder = MultipartEncoder([('field', 'value'),
('other_field', 'other_value')])
.. versionchanged:: 0.4.0
You can also provide tuples as part values as you would provide them to
requests' ``files`` parameter.
.. code-block:: python
encoder = MultipartEncoder({
'field': ('file_name', b'{"a": "b"}', 'application/json',
{'X-My-Header': 'my-value'})
])
.. warning::
This object will end up directly in :mod:`httplib`. Currently,
:mod:`httplib` has a hard-coded read size of **8192 bytes**. This
means that it will loop until the file has been read and your upload
could take a while. This is **not** a bug in requests. A feature is
being considered for this object to allow you, the user, to specify
what size should be returned on a read. If you have opinions on this,
please weigh in on `this issue`_.
.. _this issue:
https://github.com/sigmavirus24/requests-toolbelt/issues/75
"""
def __init__(self, fields, boundary=None, encoding='utf-8'):
#: Boundary value either passed in by the user or created
self.boundary_value = boundary or uuid4().hex
# Computed boundary
self.boundary = '--{0}'.format(self.boundary_value)
#: Encoding of the data being passed in
self.encoding = encoding
# Pre-encoded boundary
self._encoded_boundary = b''.join([
encode_with(self.boundary, self.encoding),
encode_with('\r\n', self.encoding)
])
#: Fields provided by the user
self.fields = fields
#: Whether or not the encoder is finished
self.finished = False
#: Pre-computed parts of the upload
self.parts = []
# Pre-computed parts iterator
self._iter_parts = iter([])
# The part we're currently working with
self._current_part = None
# Cached computation of the body's length
self._len = None
# Our buffer
self._buffer = CustomBytesIO(encoding=encoding)
# Pre-compute each part's headers
self._prepare_parts()
# Load boundary into buffer
self._write_boundary()
@property
def len(self):
"""Length of the multipart/form-data body.
requests will first attempt to get the length of the body by calling
``len(body)`` and then by checking for the ``len`` attribute.
On 32-bit systems, the ``__len__`` method cannot return anything
larger than an integer (in C) can hold. If the total size of the body
is even slightly larger than 4GB users will see an OverflowError. This
manifested itself in `bug #80`_.
As such, we now calculate the length lazily as a property.
.. _bug #80:
https://github.com/sigmavirus24/requests-toolbelt/issues/80
"""
# If _len isn't already calculated, calculate, return, and set it
return self._len or self._calculate_length()
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def _calculate_length(self):
"""
This uses the parts to calculate the length of the body.
This returns the calculated length so __len__ can be lazy.
"""
boundary_len = len(self.boundary) # Length of --{boundary}
# boundary length + header length + body length + len('\r\n') * 2
self._len = sum(
(boundary_len + total_len(p) + 4) for p in self.parts
) + boundary_len + 4
return self._len
def _calculate_load_amount(self, read_size):
"""This calculates how many bytes need to be added to the buffer.
When a consumer read's ``x`` from the buffer, there are two cases to
satisfy:
1. Enough data in the buffer to return the requested amount
2. Not enough data
This function uses the amount of unread bytes in the buffer and
determines how much the Encoder has to load before it can return the
requested amount of bytes.
:param int read_size: the number of bytes the consumer requests
:returns: int -- the number of bytes that must be loaded into the
buffer before the read can be satisfied. This will be strictly
non-negative
"""
amount = read_size - total_len(self._buffer)
return amount if amount > 0 else 0
def _load(self, amount):
"""Load ``amount`` number of bytes into the buffer."""
self._buffer.smart_truncate()
part = self._current_part or self._next_part()
while amount == -1 or amount > 0:
written = 0
if not part.bytes_left_to_write():
written += self._write(b'\r\n')
written += self._write_boundary()
part = self._next_part()
if not part:
written += self._write_closing_boundary()
self.finished = True
break
written += part.write_to(self._buffer, amount)
if amount != -1:
amount -= written
def _next_part(self):
try:
p = self._current_part = next(self._iter_parts)
except StopIteration:
p = None
return p
def _iter_fields(self):
_fields = self.fields
if hasattr(self.fields, 'items'):
_fields = list(self.fields.items())
for k, v in _fields:
file_name = None
file_type = None
file_headers = None
if isinstance(v, (list, tuple)):
if len(v) == 2:
file_name, file_pointer = v
elif len(v) == 3:
file_name, file_pointer, file_type = v
else:
file_name, file_pointer, file_type, file_headers = v
else:
file_pointer = v
field = fields.RequestField(name=k, data=file_pointer,
filename=file_name,
headers=file_headers)
field.make_multipart(content_type=file_type)
yield field
def _prepare_parts(self):
"""This uses the fields provided by the user and creates Part objects.
It populates the `parts` attribute and uses that to create a
generator for iteration.
"""
enc = self.encoding
self.parts = [Part.from_field(f, enc) for f in self._iter_fields()]
self._iter_parts = iter(self.parts)
def _write(self, bytes_to_write):
"""Write the bytes to the end of the buffer.
:param bytes bytes_to_write: byte-string (or bytearray) to append to
the buffer
:returns: int -- the number of bytes written
"""
return self._buffer.append(bytes_to_write)
def _write_boundary(self):
"""Write the boundary to the end of the buffer."""
return self._write(self._encoded_boundary)
def _write_closing_boundary(self):
"""Write the bytes necessary to finish a multipart/form-data body."""
with reset(self._buffer):
self._buffer.seek(-2, 2)
self._buffer.write(b'--\r\n')
return 2
def _write_headers(self, headers):
"""Write the current part's headers to the buffer."""
return self._write(encode_with(headers, self.encoding))
@property
def content_type(self):
return str(
'multipart/form-data; boundary={0}'.format(self.boundary_value)
)
def to_string(self):
"""Return the entirety of the data in the encoder.
.. note::
This simply reads all of the data it can. If you have started
streaming or reading data from the encoder, this method will only
return whatever data is left in the encoder.
.. note::
This method affects the internal state of the encoder. Calling
this method will exhaust the encoder.
:returns: the multipart message
:rtype: bytes
"""
return self.read()
def read(self, size=-1):
"""Read data from the streaming encoder.
:param int size: (optional), If provided, ``read`` will return exactly
that many bytes. If it is not provided, it will return the
remaining bytes.
:returns: bytes
"""
if self.finished:
return self._buffer.read(size)
bytes_to_load = size
if bytes_to_load != -1 and bytes_to_load is not None:
bytes_to_load = self._calculate_load_amount(int(size))
self._load(bytes_to_load)
return self._buffer.read(size)
def IDENTITY(monitor):
return monitor
class MultipartEncoderMonitor(object):
"""
An object used to monitor the progress of a :class:`MultipartEncoder`.
The :class:`MultipartEncoder` should only be responsible for preparing and
streaming the data. For anyone who wishes to monitor it, they shouldn't be
using that instance to manage that as well. Using this class, they can
monitor an encoder and register a callback. The callback receives the
instance of the monitor.
To use this monitor, you construct your :class:`MultipartEncoder` as you
normally would.
.. code-block:: python
from requests_toolbelt import (MultipartEncoder,
MultipartEncoderMonitor)
import requests
def callback(encoder, bytes_read):
# Do something with this information
pass
m = MultipartEncoder(fields={'field0': 'value0'})
monitor = MultipartEncoderMonitor(m, callback)
headers = {'Content-Type': montior.content_type}
r = requests.post('https://httpbin.org/post', data=monitor,
headers=headers)
Alternatively, if your use case is very simple, you can use the following
pattern.
.. code-block:: python
from requests_toolbelt import MultipartEncoderMonitor
import requests
def callback(encoder, bytes_read):
# Do something with this information
pass
monitor = MultipartEncoderMonitor.from_fields(
fields={'field0': 'value0'}, callback
)
headers = {'Content-Type': montior.content_type}
r = requests.post('https://httpbin.org/post', data=monitor,
headers=headers)
"""
def __init__(self, encoder, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
@classmethod
def from_fields(cls, fields, boundary=None, encoding='utf-8',
callback=None):
encoder = MultipartEncoder(fields, boundary, encoding)
return cls(encoder, callback)
@property
def content_type(self):
return self.encoder.content_type
def to_string(self):
return self.read()
def read(self, size=-1):
string = self.encoder.read(size)
self.bytes_read += len(string)
self.callback(self)
return string
def encode_with(string, encoding):
"""Encoding ``string`` with ``encoding`` if necessary.
:param str string: If string is a bytes object, it will not encode it.
Otherwise, this function will encode it with the provided encoding.
:param str encoding: The encoding with which to encode string.
:returns: encoded bytes object
"""
if not (string is None or isinstance(string, bytes)):
return string.encode(encoding)
return string
def readable_data(data, encoding):
"""Coerce the data to an object with a ``read`` method."""
if hasattr(data, 'read'):
return data
return CustomBytesIO(data, encoding)
def total_len(o):
if hasattr(o, '__len__'):
return len(o)
if hasattr(o, 'len'):
return o.len
if hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
return os.fstat(fileno).st_size
if hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
return len(o.getvalue())
@contextlib.contextmanager
def reset(buffer):
"""Keep track of the buffer's current position and write to the end.
This is a context manager meant to be used when adding data to the buffer.
It eliminates the need for every function to be concerned with the
position of the cursor in the buffer.
"""
original_position = buffer.tell()
buffer.seek(0, 2)
yield
buffer.seek(original_position, 0)
def coerce_data(data, encoding):
"""Ensure that every object's __len__ behaves uniformly."""
if not isinstance(data, CustomBytesIO):
if hasattr(data, 'getvalue'):
return CustomBytesIO(data.getvalue(), encoding)
if hasattr(data, 'fileno'):
return FileWrapper(data)
if not hasattr(data, 'read'):
return CustomBytesIO(data, encoding)
return data
def to_list(fields):
if hasattr(fields, 'items'):
return list(fields.items())
return list(fields)
class Part(object):
def __init__(self, headers, body):
self.headers = headers
self.body = body
self.headers_unread = True
self.len = len(self.headers) + total_len(self.body)
@classmethod
def from_field(cls, field, encoding):
"""Create a part from a Request Field generated by urllib3."""
headers = encode_with(field.render_headers(), encoding)
body = coerce_data(field.data, encoding)
return cls(headers, body)
def bytes_left_to_write(self):
"""Determine if there are bytes left to write.
:returns: bool -- ``True`` if there are bytes left to write, otherwise
``False``
"""
to_read = 0
if self.headers_unread:
to_read += len(self.headers)
return (to_read + total_len(self.body)) > 0
def write_to(self, buffer, size):
"""Write the requested amount of bytes to the buffer provided.
The number of bytes written may exceed size on the first read since we
load the headers ambitiously.
:param CustomBytesIO buffer: buffer we want to write bytes to
:param int size: number of bytes requested to be written to the buffer
:returns: int -- number of bytes actually written
"""
written = 0
if self.headers_unread:
written += buffer.append(self.headers)
self.headers_unread = False
while total_len(self.body) > 0 and (size == -1 or written < size):
amount_to_read = size
if size != -1:
amount_to_read = size - written
written += buffer.append(self.body.read(amount_to_read))
return written
class CustomBytesIO(io.BytesIO):
def __init__(self, buffer=None, encoding='utf-8'):
buffer = encode_with(buffer, encoding)
super(CustomBytesIO, self).__init__(buffer)
def _get_end(self):
current_pos = self.tell()
self.seek(0, 2)
length = self.tell()
self.seek(current_pos, 0)
return length
@property
def len(self):
length = self._get_end()
return length - self.tell()
def append(self, bytes):
with reset(self):
written = self.write(bytes)
return written
def smart_truncate(self):
to_be_read = total_len(self)
already_read = self._get_end() - to_be_read
if already_read >= to_be_read:
old_bytes = self.read()
self.seek(0, 0)
self.truncate()
self.write(old_bytes)
self.seek(0, 0) # We want to be at the beginning
class FileWrapper(object):
def __init__(self, file_object):
self.fd = file_object
@property
def len(self):
return total_len(self.fd) - self.fd.tell()
def read(self, length=-1):
return self.fd.read(length)

View File

@ -0,0 +1,70 @@
import requests
from ._compat import urljoin
class BaseUrlSession(requests.Session):
"""A Session with a URL that all requests will use as a base.
Let's start by looking at an example:
.. code-block:: python
>>> from requests_toolbelt import sessions
>>> s = sessions.BaseUrlSession(
... base_url='https://example.com/resource/')
>>> r = s.get('sub-resource/' params={'foo': 'bar'})
>>> print(r.request.url)
https://example.com/resource/sub-resource/?foo=bar
Our call to the ``get`` method will make a request to the URL passed in
when we created the Session and the partial resource name we provide.
We implement this by overriding the ``request`` method so most uses of a
Session are covered. (This, however, precludes the use of PreparedRequest
objects).
.. note::
The base URL that you provide and the path you provide are **very**
important.
Let's look at another *similar* example
.. code-block:: python
>>> from requests_toolbelt import sessions
>>> s = sessions.BaseUrlSession(
... base_url='https://example.com/resource/')
>>> r = s.get('/sub-resource/' params={'foo': 'bar'})
>>> print(r.request.url)
https://example.com/sub-resource/?foo=bar
The key difference here is that we called ``get`` with ``/sub-resource/``,
i.e., there was a leading ``/``. This changes how we create the URL
because we rely on :mod:`urllib.parse.urljoin`.
To override how we generate the URL, sub-class this method and override the
``create_url`` method.
Based on implementation from
https://github.com/kennethreitz/requests/issues/2554#issuecomment-109341010
"""
base_url = None
def __init__(self, base_url=None):
if base_url:
self.base_url = base_url
super(BaseUrlSession, self).__init__()
def request(self, method, url, *args, **kwargs):
"""Send the request after generating the complete URL."""
url = self.create_url(url)
return super(BaseUrlSession, self).request(
method, url, *args, **kwargs
)
def create_url(self, url):
"""Create the URL based off this partial path."""
return urljoin(self.base_url, url)

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
"""
requests_toolbelt.streaming_iterator
====================================
This holds the implementation details for the :class:`StreamingIterator`. It
is designed for the case where you, the user, know the size of the upload but
need to provide the data as an iterator. This class will allow you to specify
the size and stream the data without using a chunked transfer-encoding.
"""
from requests.utils import super_len
from .multipart.encoder import CustomBytesIO, encode_with
class StreamingIterator(object):
"""
This class provides a way of allowing iterators with a known size to be
streamed instead of chunked.
In requests, if you pass in an iterator it assumes you want to use
chunked transfer-encoding to upload the data, which not all servers
support well. Additionally, you may want to set the content-length
yourself to avoid this but that will not work. The only way to preempt
requests using a chunked transfer-encoding and forcing it to stream the
uploads is to mimic a very specific interace. Instead of having to know
these details you can instead just use this class. You simply provide the
size and iterator and pass the instance of StreamingIterator to requests
via the data parameter like so:
.. code-block:: python
from requests_toolbelt import StreamingIterator
import requests
# Let iterator be some generator that you already have and size be
# the size of the data produced by the iterator
r = requests.post(url, data=StreamingIterator(size, iterator))
You can also pass file-like objects to :py:class:`StreamingIterator` in
case requests can't determize the filesize itself. This is the case with
streaming file objects like ``stdin`` or any sockets. Wrapping e.g. files
that are on disk with ``StreamingIterator`` is unnecessary, because
requests can determine the filesize itself.
Naturally, you should also set the `Content-Type` of your upload
appropriately because the toolbelt will not attempt to guess that for you.
"""
def __init__(self, size, iterator, encoding='utf-8'):
#: The expected size of the upload
self.size = int(size)
if self.size < 0:
raise ValueError(
'The size of the upload must be a positive integer'
)
#: Attribute that requests will check to determine the length of the
#: body. See bug #80 for more details
self.len = self.size
#: Encoding the input data is using
self.encoding = encoding
#: The iterator used to generate the upload data
self.iterator = iterator
if hasattr(iterator, 'read'):
self._file = iterator
else:
self._file = _IteratorAsBinaryFile(iterator, encoding)
def read(self, size=-1):
return encode_with(self._file.read(size), self.encoding)
class _IteratorAsBinaryFile(object):
def __init__(self, iterator, encoding='utf-8'):
#: The iterator used to generate the upload data
self.iterator = iterator
#: Encoding the iterator is using
self.encoding = encoding
# The buffer we use to provide the correct number of bytes requested
# during a read
self._buffer = CustomBytesIO()
def _get_bytes(self):
try:
return encode_with(next(self.iterator), self.encoding)
except StopIteration:
return b''
def _load_bytes(self, size):
self._buffer.smart_truncate()
amount_to_load = size - super_len(self._buffer)
bytes_to_append = True
while amount_to_load > 0 and bytes_to_append:
bytes_to_append = self._get_bytes()
amount_to_load -= self._buffer.append(bytes_to_append)
def read(self, size=-1):
size = int(size)
if size == -1:
return b''.join(self.iterator)
self._load_bytes(size)
return self._buffer.read(size)

View File

@ -0,0 +1,97 @@
"""
This module provides the API for ``requests_toolbelt.threaded``.
The module provides a clean and simple API for making requests via a thread
pool. The thread pool will use sessions for increased performance.
A simple use-case is:
.. code-block:: python
from requests_toolbelt import threaded
urls_to_get = [{
'url': 'https://api.github.com/users/sigmavirus24',
'method': 'GET',
}, {
'url': 'https://api.github.com/repos/sigmavirus24/requests-toolbelt',
'method': 'GET',
}, {
'url': 'https://google.com',
'method': 'GET',
}]
responses, errors = threaded.map(urls_to_get)
By default, the threaded submodule will detect the number of CPUs your
computer has and use that if no other number of processes is selected. To
change this, always use the keyword argument ``num_processes``. Using the
above example, we would expand it like so:
.. code-block:: python
responses, errors = threaded.map(urls_to_get, num_processes=10)
You can also customize how a :class:`requests.Session` is initialized by
creating a callback function:
.. code-block:: python
from requests_toolbelt import user_agent
def initialize_session(session):
session.headers['User-Agent'] = user_agent('my-scraper', '0.1')
session.headers['Accept'] = 'application/json'
responses, errors = threaded.map(urls_to_get,
initializer=initialize_session)
.. autofunction:: requests_toolbelt.threaded.map
Inspiration is blatantly drawn from the standard library's multiprocessing
library. See the following references:
- multiprocessing's `pool source`_
- map and map_async `inspiration`_
.. _pool source:
https://hg.python.org/cpython/file/8ef4f75a8018/Lib/multiprocessing/pool.py
.. _inspiration:
https://hg.python.org/cpython/file/8ef4f75a8018/Lib/multiprocessing/pool.py#l340
"""
from . import pool
from .._compat import queue
def map(requests, **kwargs):
r"""Simple interface to the threaded Pool object.
This function takes a list of dictionaries representing requests to make
using Sessions in threads and returns a tuple where the first item is
a generator of successful responses and the second is a generator of
exceptions.
:param list requests:
Collection of dictionaries representing requests to make with the Pool
object.
:param \*\*kwargs:
Keyword arguments that are passed to the
:class:`~requests_toolbelt.threaded.pool.Pool` object.
:returns: Tuple of responses and exceptions from the pool
:rtype: (:class:`~requests_toolbelt.threaded.pool.ThreadResponse`,
:class:`~requests_toolbelt.threaded.pool.ThreadException`)
"""
if not (requests and all(isinstance(r, dict) for r in requests)):
raise ValueError('map expects a list of dictionaries.')
# Build our queue of requests
job_queue = queue.Queue()
for request in requests:
job_queue.put(request)
# Ensure the user doesn't try to pass their own job_queue
kwargs['job_queue'] = job_queue
threadpool = pool.Pool(**kwargs)
threadpool.join_all()
return threadpool.responses(), threadpool.exceptions()

View File

@ -0,0 +1,211 @@
"""Module implementing the Pool for :mod:``requests_toolbelt.threaded``."""
import multiprocessing
import requests
from . import thread
from .._compat import queue
class Pool(object):
"""Pool that manages the threads containing sessions.
:param queue:
The queue you're expected to use to which you should add items.
:type queue: queue.Queue
:param initializer:
Function used to initialize an instance of ``session``.
:type initializer: collections.Callable
:param auth_generator:
Function used to generate new auth credentials for the session.
:type auth_generator: collections.Callable
:param int num_threads:
Number of threads to create.
:param session:
:type session: requests.Session
"""
def __init__(self, job_queue, initializer=None, auth_generator=None,
num_processes=None, session=requests.Session):
if num_processes is None:
num_processes = multiprocessing.cpu_count() or 1
if num_processes < 1:
raise ValueError("Number of processes should at least be 1.")
self._job_queue = job_queue
self._response_queue = queue.Queue()
self._exc_queue = queue.Queue()
self._processes = num_processes
self._initializer = initializer or _identity
self._auth = auth_generator or _identity
self._session = session
self._pool = [
thread.SessionThread(self._new_session(), self._job_queue,
self._response_queue, self._exc_queue)
for _ in range(self._processes)
]
def _new_session(self):
return self._auth(self._initializer(self._session()))
@classmethod
def from_exceptions(cls, exceptions, **kwargs):
r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s.
Provided an iterable that provides :class:`~ThreadException` objects,
this classmethod will generate a new pool to retry the requests that
caused the exceptions.
:param exceptions:
Iterable that returns :class:`~ThreadException`
:type exceptions: iterable
:param kwargs:
Keyword arguments passed to the :class:`~Pool` initializer.
:returns: An initialized :class:`~Pool` object.
:rtype: :class:`~Pool`
"""
job_queue = queue.Queue()
for exc in exceptions:
job_queue.put(exc.request_kwargs)
return cls(job_queue=job_queue, **kwargs)
@classmethod
def from_urls(cls, urls, request_kwargs=None, **kwargs):
"""Create a :class:`~Pool` from an iterable of URLs.
:param urls:
Iterable that returns URLs with which we create a pool.
:type urls: iterable
:param dict request_kwargs:
Dictionary of other keyword arguments to provide to the request
method.
:param kwargs:
Keyword arguments passed to the :class:`~Pool` initializer.
:returns: An initialized :class:`~Pool` object.
:rtype: :class:`~Pool`
"""
request_dict = {'method': 'GET'}
request_dict.update(request_kwargs or {})
job_queue = queue.Queue()
for url in urls:
job = request_dict.copy()
job.update({'url': url})
job_queue.put(job)
return cls(job_queue=job_queue, **kwargs)
def exceptions(self):
"""Iterate over all the exceptions in the pool.
:returns: Generator of :class:`~ThreadException`
"""
while True:
exc = self.get_exception()
if exc is None:
break
yield exc
def get_exception(self):
"""Get an exception from the pool.
:rtype: :class:`~ThreadException`
"""
try:
(request, exc) = self._exc_queue.get_nowait()
except queue.Empty:
return None
else:
return ThreadException(request, exc)
def get_response(self):
"""Get a response from the pool.
:rtype: :class:`~ThreadResponse`
"""
try:
(request, response) = self._response_queue.get_nowait()
except queue.Empty:
return None
else:
return ThreadResponse(request, response)
def responses(self):
"""Iterate over all the responses in the pool.
:returns: Generator of :class:`~ThreadResponse`
"""
while True:
resp = self.get_response()
if resp is None:
break
yield resp
def join_all(self):
"""Join all the threads to the master thread."""
for session_thread in self._pool:
session_thread.join()
class ThreadProxy(object):
proxied_attr = None
def __getattr__(self, attr):
"""Proxy attribute accesses to the proxied object."""
get = object.__getattribute__
if attr not in self.attrs:
response = get(self, self.proxied_attr)
return getattr(response, attr)
else:
return get(self, attr)
class ThreadResponse(ThreadProxy):
"""A wrapper around a requests Response object.
This will proxy most attribute access actions to the Response object. For
example, if you wanted the parsed JSON from the response, you might do:
.. code-block:: python
thread_response = pool.get_response()
json = thread_response.json()
"""
proxied_attr = 'response'
attrs = frozenset(['request_kwargs', 'response'])
def __init__(self, request_kwargs, response):
#: The original keyword arguments provided to the queue
self.request_kwargs = request_kwargs
#: The wrapped response
self.response = response
class ThreadException(ThreadProxy):
"""A wrapper around an exception raised during a request.
This will proxy most attribute access actions to the exception object. For
example, if you wanted the message from the exception, you might do:
.. code-block:: python
thread_exc = pool.get_exception()
msg = thread_exc.message
"""
proxied_attr = 'exception'
attrs = frozenset(['request_kwargs', 'exception'])
def __init__(self, request_kwargs, exception):
#: The original keyword arguments provided to the queue
self.request_kwargs = request_kwargs
#: The captured and wrapped exception
self.exception = exception
def _identity(session_obj):
return session_obj
__all__ = ['ThreadException', 'ThreadResponse', 'Pool']

View File

@ -0,0 +1,53 @@
"""Module containing the SessionThread class."""
import threading
import uuid
import requests.exceptions as exc
from .._compat import queue
class SessionThread(object):
def __init__(self, initialized_session, job_queue, response_queue,
exception_queue):
self._session = initialized_session
self._jobs = job_queue
self._create_worker()
self._responses = response_queue
self._exceptions = exception_queue
def _create_worker(self):
self._worker = threading.Thread(
target=self._make_request,
name=uuid.uuid4(),
)
self._worker.daemon = True
self._worker._state = 0
self._worker.start()
def _handle_request(self, kwargs):
try:
response = self._session.request(**kwargs)
except exc.RequestException as e:
self._exceptions.put((kwargs, e))
else:
self._responses.put((kwargs, response))
finally:
self._jobs.task_done()
def _make_request(self):
while True:
try:
kwargs = self._jobs.get_nowait()
except queue.Empty:
break
self._handle_request(kwargs)
def is_alive(self):
"""Proxy to the thread's ``is_alive`` method."""
return self._worker.is_alive()
def join(self):
"""Join this thread to the master thread."""
self._worker.join()

View File

View File

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""A collection of functions deprecated in requests.utils."""
import re
import sys
from requests import utils
find_charset = re.compile(
br'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I
).findall
find_pragma = re.compile(
br'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I
).findall
find_xml = re.compile(
br'^<\?xml.*?encoding=["\']*(.+?)["\'>]'
).findall
def get_encodings_from_content(content):
"""Return encodings from given content string.
.. code-block:: python
import requests
from requests_toolbelt.utils import deprecated
r = requests.get(url)
encodings = deprecated.get_encodings_from_content(r)
:param content: bytestring to extract encodings from
:type content: bytes
:return: encodings detected in the provided content
:rtype: list(str)
"""
encodings = (find_charset(content) + find_pragma(content)
+ find_xml(content))
if (3, 0) <= sys.version_info < (4, 0):
encodings = [encoding.decode('utf8') for encoding in encodings]
return encodings
def get_unicode_from_response(response):
"""Return the requested content back in unicode.
This will first attempt to retrieve the encoding from the response
headers. If that fails, it will use
:func:`requests_toolbelt.utils.deprecated.get_encodings_from_content`
to determine encodings from HTML elements.
.. code-block:: python
import requests
from requests_toolbelt.utils import deprecated
r = requests.get(url)
text = deprecated.get_unicode_from_response(r)
:param response: Response object to get unicode content from.
:type response: requests.models.Response
"""
tried_encodings = set()
# Try charset from content-type
encoding = utils.get_encoding_from_headers(response.headers)
if encoding:
try:
return str(response.content, encoding)
except UnicodeError:
tried_encodings.add(encoding.lower())
encodings = get_encodings_from_content(response.content)
for _encoding in encodings:
_encoding = _encoding.lower()
if _encoding in tried_encodings:
continue
try:
return str(response.content, _encoding)
except UnicodeError:
tried_encodings.add(_encoding)
# Fall back:
if encoding:
try:
return str(response.content, encoding, errors='replace')
except TypeError:
pass
return response.text

View File

@ -0,0 +1,195 @@
"""This module provides functions for dumping information about responses."""
import collections
from requests import compat
__all__ = ('dump_response', 'dump_all')
HTTP_VERSIONS = {
9: b'0.9',
10: b'1.0',
11: b'1.1',
}
_PrefixSettings = collections.namedtuple('PrefixSettings',
['request', 'response'])
class PrefixSettings(_PrefixSettings):
def __new__(cls, request, response):
request = _coerce_to_bytes(request)
response = _coerce_to_bytes(response)
return super(PrefixSettings, cls).__new__(cls, request, response)
def _get_proxy_information(response):
if getattr(response.connection, 'proxy_manager', False):
proxy_info = {}
request_url = response.request.url
if request_url.startswith('https://'):
proxy_info['method'] = 'CONNECT'
proxy_info['request_path'] = request_url
return proxy_info
return None
def _format_header(name, value):
return (_coerce_to_bytes(name) + b': ' + _coerce_to_bytes(value) +
b'\r\n')
def _build_request_path(url, proxy_info):
uri = compat.urlparse(url)
proxy_url = proxy_info.get('request_path')
if proxy_url is not None:
return proxy_url, uri
request_path = _coerce_to_bytes(uri.path)
if uri.query:
request_path += b'?' + _coerce_to_bytes(uri.query)
return request_path, uri
def _dump_request_data(request, prefixes, bytearr, proxy_info=None):
if proxy_info is None:
proxy_info = {}
prefix = prefixes.request
method = _coerce_to_bytes(proxy_info.pop('method', request.method))
request_path, uri = _build_request_path(request.url, proxy_info)
# <prefix><METHOD> <request-path> HTTP/1.1
bytearr.extend(prefix + method + b' ' + request_path + b' HTTP/1.1\r\n')
# <prefix>Host: <request-host> OR host header specified by user
headers = request.headers.copy()
host_header = _coerce_to_bytes(headers.pop('Host', uri.netloc))
bytearr.extend(prefix + b'Host: ' + host_header + b'\r\n')
for name, value in headers.items():
bytearr.extend(prefix + _format_header(name, value))
bytearr.extend(prefix + b'\r\n')
if request.body:
if isinstance(request.body, compat.basestring):
bytearr.extend(prefix + _coerce_to_bytes(request.body))
else:
# In the event that the body is a file-like object, let's not try
# to read everything into memory.
bytearr.extend('<< Request body is not a string-like type >>')
bytearr.extend(b'\r\n')
def _dump_response_data(response, prefixes, bytearr):
prefix = prefixes.response
# Let's interact almost entirely with urllib3's response
raw = response.raw
# Let's convert the version int from httplib to bytes
version_str = HTTP_VERSIONS.get(raw.version, b'?')
# <prefix>HTTP/<version_str> <status_code> <reason>
bytearr.extend(prefix + b'HTTP/' + version_str + b' ' +
str(raw.status).encode('ascii') + b' ' +
_coerce_to_bytes(response.reason) + b'\r\n')
headers = raw.headers
for name in headers.keys():
for value in headers.getlist(name):
bytearr.extend(prefix + _format_header(name, value))
bytearr.extend(prefix + b'\r\n')
bytearr.extend(response.content)
def _coerce_to_bytes(data):
if not isinstance(data, bytes) and hasattr(data, 'encode'):
data = data.encode('utf-8')
return data
def dump_response(response, request_prefix=b'< ', response_prefix=b'> ',
data_array=None):
"""Dump a single request-response cycle's information.
This will take a response object and dump only the data that requests can
see for that single request-response cycle.
Example::
import requests
from requests_toolbelt.utils import dump
resp = requests.get('https://api.github.com/users/sigmavirus24')
data = dump.dump_response(resp)
print(data.decode('utf-8'))
:param response:
The response to format
:type response: :class:`requests.Response`
:param request_prefix: (*optional*)
Bytes to prefix each line of the request data
:type request_prefix: :class:`bytes`
:param response_prefix: (*optional*)
Bytes to prefix each line of the response data
:type response_prefix: :class:`bytes`
:param data_array: (*optional*)
Bytearray to which we append the request-response cycle data
:type data_array: :class:`bytearray`
:returns: Formatted bytes of request and response information.
:rtype: :class:`bytearray`
"""
data = data_array if data_array is not None else bytearray()
prefixes = PrefixSettings(request_prefix, response_prefix)
if not hasattr(response, 'request'):
raise ValueError('Response has no associated request')
proxy_info = _get_proxy_information(response)
_dump_request_data(response.request, prefixes, data,
proxy_info=proxy_info)
_dump_response_data(response, prefixes, data)
return data
def dump_all(response, request_prefix=b'< ', response_prefix=b'> '):
"""Dump all requests and responses including redirects.
This takes the response returned by requests and will dump all
request-response pairs in the redirect history in order followed by the
final request-response.
Example::
import requests
from requests_toolbelt.utils import dump
resp = requests.get('https://httpbin.org/redirect/5')
data = dump.dump_all(resp)
print(data.decode('utf-8'))
:param response:
The response to format
:type response: :class:`requests.Response`
:param request_prefix: (*optional*)
Bytes to prefix each line of the request data
:type request_prefix: :class:`bytes`
:param response_prefix: (*optional*)
Bytes to prefix each line of the response data
:type response_prefix: :class:`bytes`
:returns: Formatted bytes of request and response information.
:rtype: :class:`bytearray`
"""
data = bytearray()
history = list(response.history[:])
history.append(response)
for response in history:
dump_response(response, request_prefix, response_prefix, data)
return data

View File

@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
"""Implementation of nested form-data encoding function(s)."""
from .._compat import basestring
from .._compat import urlencode as _urlencode
__all__ = ('urlencode',)
def urlencode(query, *args, **kwargs):
"""Handle nested form-data queries and serialize them appropriately.
There are times when a website expects a nested form data query to be sent
but, the standard library's urlencode function does not appropriately
handle the nested structures. In that case, you need this function which
will flatten the structure first and then properly encode it for you.
When using this to send data in the body of a request, make sure you
specify the appropriate Content-Type header for the request.
.. code-block:: python
import requests
from requests_toolbelt.utils import formdata
query = {
'my_dict': {
'foo': 'bar',
'biz': 'baz",
},
'a': 'b',
}
resp = requests.get(url, params=formdata.urlencode(query))
# or
resp = requests.post(
url,
data=formdata.urlencode(query),
headers={
'Content-Type': 'application/x-www-form-urlencoded'
},
)
Similarly, you can specify a list of nested tuples, e.g.,
.. code-block:: python
import requests
from requests_toolbelt.utils import formdata
query = [
('my_list', [
('foo', 'bar'),
('biz', 'baz'),
]),
('a', 'b'),
]
resp = requests.get(url, params=formdata.urlencode(query))
# or
resp = requests.post(
url,
data=formdata.urlencode(query),
headers={
'Content-Type': 'application/x-www-form-urlencoded'
},
)
For additional parameter and return information, see the official
`urlencode`_ documentation.
.. _urlencode:
https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urlencode
"""
expand_classes = (dict, list, tuple)
original_query_list = _to_kv_list(query)
if not all(_is_two_tuple(i) for i in original_query_list):
raise ValueError("Expected query to be able to be converted to a "
"list comprised of length 2 tuples.")
query_list = original_query_list
while any(isinstance(v, expand_classes) for _, v in query_list):
query_list = _expand_query_values(query_list)
return _urlencode(query_list, *args, **kwargs)
def _to_kv_list(dict_or_list):
if hasattr(dict_or_list, 'items'):
return list(dict_or_list.items())
return dict_or_list
def _is_two_tuple(item):
return isinstance(item, (list, tuple)) and len(item) == 2
def _expand_query_values(original_query_list):
query_list = []
for key, value in original_query_list:
if isinstance(value, basestring):
query_list.append((key, value))
else:
key_fmt = key + '[%s]'
value_list = _to_kv_list(value)
query_list.extend((key_fmt % k, v) for k, v in value_list)
return query_list

View File

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
import collections
import platform
import sys
def user_agent(name, version, extras=None):
"""Return an internet-friendly user_agent string.
The majority of this code has been wilfully stolen from the equivalent
function in Requests.
:param name: The intended name of the user-agent, e.g. "python-requests".
:param version: The version of the user-agent, e.g. "0.0.1".
:param extras: List of two-item tuples that are added to the user-agent
string.
:returns: Formatted user-agent string
:rtype: str
"""
if extras is None:
extras = []
return UserAgentBuilder(
name, version
).include_extras(
extras
).include_implementation(
).include_system().build()
class UserAgentBuilder(object):
"""Class to provide a greater level of control than :func:`user_agent`.
This is used by :func:`user_agent` to build its User-Agent string.
.. code-block:: python
user_agent_str = UserAgentBuilder(
name='requests-toolbelt',
version='17.4.0',
).include_implementation(
).include_system(
).include_extras([
('requests', '2.14.2'),
('urllib3', '1.21.2'),
]).build()
"""
format_string = '%s/%s'
def __init__(self, name, version):
"""Initialize our builder with the name and version of our user agent.
:param str name:
Name of our user-agent.
:param str version:
The version string for user-agent.
"""
self._pieces = collections.deque([(name, version)])
def build(self):
"""Finalize the User-Agent string.
:returns:
Formatted User-Agent string.
:rtype:
str
"""
return " ".join([self.format_string % piece for piece in self._pieces])
def include_extras(self, extras):
"""Include extra portions of the User-Agent.
:param list extras:
list of tuples of extra-name and extra-version
"""
if any(len(extra) != 2 for extra in extras):
raise ValueError('Extras should be a sequence of two item tuples.')
self._pieces.extend(extras)
return self
def include_implementation(self):
"""Append the implementation string to the user-agent string.
This adds the the information that you're using CPython 2.7.13 to the
User-Agent.
"""
self._pieces.append(_implementation_tuple())
return self
def include_system(self):
"""Append the information about the Operating System."""
self._pieces.append(_platform_tuple())
return self
def _implementation_tuple():
"""Return the tuple of interpreter name and version.
Returns a string that provides both the name and the version of the Python
implementation currently running. For example, on CPython 2.7.5 it will
return "CPython/2.7.5".
This function works best on CPython and PyPy: in particular, it probably
doesn't work for Jython or IronPython. Future investigation should be done
to work out the correct shape of the code for those platforms.
"""
implementation = platform.python_implementation()
if implementation == 'CPython':
implementation_version = platform.python_version()
elif implementation == 'PyPy':
implementation_version = '%s.%s.%s' % (sys.pypy_version_info.major,
sys.pypy_version_info.minor,
sys.pypy_version_info.micro)
if sys.pypy_version_info.releaselevel != 'final':
implementation_version = ''.join([
implementation_version, sys.pypy_version_info.releaselevel
])
elif implementation == 'Jython':
implementation_version = platform.python_version() # Complete Guess
elif implementation == 'IronPython':
implementation_version = platform.python_version() # Complete Guess
else:
implementation_version = 'Unknown'
return (implementation, implementation_version)
def _implementation_string():
return "%s/%s" % _implementation_tuple()
def _platform_tuple():
try:
p_system = platform.system()
p_release = platform.release()
except IOError:
p_system = 'Unknown'
p_release = 'Unknown'
return (p_system, p_release)

7
setup.cfg Normal file
View File

@ -0,0 +1,7 @@
[wheel]
universal = 1
[egg_info]
tag_build =
tag_date = 0

68
setup.py Normal file
View File

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import os
import re
import sys
from setuptools import setup
if sys.argv[-1].lower() in ("submit", "publish"):
os.system("python setup.py bdist_wheel sdist upload")
sys.exit()
def get_version():
version = ''
with open('requests_toolbelt/__init__.py', 'r') as fd:
reg = re.compile(r'__version__ = [\'"]([^\'"]*)[\'"]')
for line in fd:
m = reg.match(line)
if m:
version = m.group(1)
break
return version
__version__ = get_version()
if not __version__:
raise RuntimeError('Cannot find version information')
packages = [
'requests_toolbelt',
'requests_toolbelt.adapters',
'requests_toolbelt.auth',
'requests_toolbelt.downloadutils',
'requests_toolbelt.multipart',
'requests_toolbelt.threaded',
'requests_toolbelt.utils',
]
setup(
name="requests-toolbelt",
version=__version__,
description="A utility belt for advanced users of python-requests",
long_description="\n\n".join([open("README.rst").read(),
open("HISTORY.rst").read()]),
license='Apache 2.0',
author='Ian Cordasco, Cory Benfield',
author_email="graffatcolmingov@gmail.com",
url="https://toolbelt.readthedocs.org",
packages=packages,
package_data={'': ['LICENSE', 'AUTHORS.rst']},
include_package_data=True,
install_requires=['requests>=2.0.1,<3.0.0'],
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: Apache Software License',
'Intended Audience :: Developers',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: Implementation :: CPython',
],
)

8
tests/__init__.py Normal file
View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
import betamax
def get_betamax(session):
return betamax.Betamax(
session,
cassette_library_dir='tests/cassettes')

View File

@ -0,0 +1 @@
{"recorded_with": "betamax/0.5.1", "http_interactions": [{"response": {"status": {"code": 302, "message": "FOUND"}, "body": {"string": "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 3.2 Final//EN\">\n<title>Redirecting...</title>\n<h1>Redirecting...</h1>\n<p>You should be redirected automatically to target URL: <a href=\"/cookies\">/cookies</a>. If not click the link.", "encoding": "utf-8"}, "url": "https://httpbin.org/cookies/set?cookie0=value0", "headers": {"Location": ["/cookies"], "Content-Length": ["223"], "Date": ["Fri, 13 Nov 2015 00:23:20 GMT"], "Access-Control-Allow-Credentials": ["true"], "Access-Control-Allow-Origin": ["*"], "Connection": ["keep-alive"], "Server": ["nginx"], "Set-Cookie": ["cookie0=value0; Path=/"], "Content-Type": ["text/html; charset=utf-8"]}}, "recorded_at": "2015-11-13T00:23:19", "request": {"uri": "https://httpbin.org/cookies/set?cookie0=value0", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": ["keep-alive"], "User-Agent": ["python-requests/2.8.1"], "Accept-Encoding": ["gzip, deflate"], "Accept": ["*/*"]}}}, {"response": {"status": {"code": 200, "message": "OK"}, "body": {"string": "{\n \"cookies\": {\n \"cookie0\": \"value0\"\n }\n}\n", "encoding": null}, "url": "https://httpbin.org/cookies", "headers": {"Access-Control-Allow-Credentials": ["true"], "Content-Length": ["47"], "Date": ["Fri, 13 Nov 2015 00:23:20 GMT"], "Content-Type": ["application/json"], "Connection": ["keep-alive"], "Server": ["nginx"], "Access-Control-Allow-Origin": ["*"]}}, "recorded_at": "2015-11-13T00:23:19", "request": {"uri": "https://httpbin.org/cookies", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": ["keep-alive"], "User-Agent": ["python-requests/2.8.1"], "Accept-Encoding": ["gzip, deflate"], "Accept": ["*/*"], "Cookie": ["cookie0=value0"]}}}]}

View File

@ -0,0 +1 @@
{"recorded_with": "betamax/0.4.1", "http_interactions": [{"response": {"status": {"message": "OK", "code": 200}, "body": {"string": "{\n \"args\": {}, \n \"headers\": {\n \"Accept\": \"*/*\", \n \"Accept-Encoding\": \"gzip, deflate\", \n \"Connection\": \"keep-alive\", \n \"Host\": \"http2bin.org\", \n \"User-Agent\": \"python-requests/2.5.3 CPython/2.7.9 Darwin/14.1.0\"\n }, \n \"origin\": \"77.99.146.203\", \n \"url\": \"https://http2bin.org/get\"\n}\n", "encoding": null}, "headers": {"access-control-allow-origin": ["*"], "date": ["Tue, 03 Mar 2015 21:29:55 GMT"], "server": ["h2o/1.0.2-alpha1"], "content-length": ["301"], "access-control-allow-credentials": ["true"], "connection": ["keep-alive"], "content-type": ["application/json"]}, "url": "https://http2bin.org/get"}, "recorded_at": "2015-03-03T21:29:55", "request": {"method": "GET", "uri": "https://http2bin.org/get", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Accept": ["*/*"], "Accept-Encoding": ["gzip, deflate"], "Connection": ["keep-alive"], "User-Agent": ["python-requests/2.5.3 CPython/2.7.9 Darwin/14.1.0"]}}}]}

View File

@ -0,0 +1 @@
{"http_interactions": [{"request": {"body": {"string": "", "encoding": "utf-8"}, "headers": {"Accept-Encoding": ["gzip, deflate, compress"], "Accept": ["*/*"], "User-Agent": ["python-requests/2.2.1 CPython/2.7.6 Linux/3.14.1-1-ARCH"]}, "method": "GET", "uri": "http://httpbin.org/basic-auth/user/passwd"}, "response": {"body": {"string": "", "encoding": null}, "headers": {"content-length": ["0"], "server": ["gunicorn/0.17.4"], "connection": ["keep-alive"], "date": ["Sat, 03 May 2014 17:23:06 GMT"], "access-control-allow-origin": ["*"], "www-authenticate": ["Basic realm=\"Fake Realm\""]}, "status": {"message": "UNAUTHORIZED", "code": 401}, "url": "http://httpbin.org/basic-auth/user/passwd"}, "recorded_at": "2014-05-03T17:23:06"}, {"request": {"body": {"string": "", "encoding": "utf-8"}, "headers": {"Accept": ["*/*"], "Accept-Encoding": ["gzip, deflate, compress"], "Authorization": ["Basic dXNlcjpwYXNzd2Q="], "User-Agent": ["python-requests/2.2.1 CPython/2.7.6 Linux/3.14.1-1-ARCH"]}, "method": "GET", "uri": "http://httpbin.org/basic-auth/user/passwd"}, "response": {"body": {"string": "{\n \"user\": \"user\",\n \"authenticated\": true\n}", "encoding": null}, "headers": {"content-length": ["45"], "server": ["gunicorn/0.17.4"], "connection": ["keep-alive"], "date": ["Sat, 03 May 2014 17:23:06 GMT"], "access-control-allow-origin": ["*"], "content-type": ["application/json"]}, "status": {"message": "OK", "code": 200}, "url": "http://httpbin.org/basic-auth/user/passwd"}, "recorded_at": "2014-05-03T17:23:06"}], "recorded_with": "betamax/{version}"}

View File

@ -0,0 +1 @@
{"http_interactions": [{"request": {"body": {"string": "", "encoding": "utf-8"}, "headers": {"Accept-Encoding": ["gzip, deflate, compress"], "Accept": ["*/*"], "User-Agent": ["python-requests/2.2.1 CPython/2.7.6 Linux/3.14.1-1-ARCH"]}, "method": "GET", "uri": "http://httpbin.org/digest-auth/auth/user/passwd"}, "response": {"body": {"string": "", "encoding": "utf-8"}, "headers": {"content-length": ["0"], "set-cookie": ["fake=fake_value"], "server": ["gunicorn/0.17.4"], "connection": ["keep-alive"], "date": ["Sat, 03 May 2014 17:23:07 GMT"], "access-control-allow-origin": ["*"], "content-type": ["text/html; charset=utf-8"], "www-authenticate": ["Digest qop=auth, nonce=\"713b4eb6d0ad0ac25d75b50c4d044d5e\", realm=\"me@kennethreitz.com\", opaque=\"d0033bc1960ca78a2fc4497c1e8a8cbd\""]}, "status": {"message": "UNAUTHORIZED", "code": 401}, "url": "http://httpbin.org/digest-auth/auth/user/passwd"}, "recorded_at": "2014-05-03T17:23:07"}, {"request": {"body": {"string": "", "encoding": "utf-8"}, "headers": {"Accept": ["*/*"], "Cookie": ["fake=fake_value"], "Accept-Encoding": ["gzip, deflate, compress"], "Authorization": ["Digest username=\"user\", realm=\"me@kennethreitz.com\", nonce=\"713b4eb6d0ad0ac25d75b50c4d044d5e\", uri=\"/digest-auth/auth/user/passwd\", response=\"30276b25ef0031e65e3bccc719031388\", opaque=\"d0033bc1960ca78a2fc4497c1e8a8cbd\", qop=\"auth\", nc=00000001, cnonce=\"e94e00be64d66bcb\""], "User-Agent": ["python-requests/2.2.1 CPython/2.7.6 Linux/3.14.1-1-ARCH"]}, "method": "GET", "uri": "http://httpbin.org/digest-auth/auth/user/passwd"}, "response": {"body": {"string": "{\n \"user\": \"user\",\n \"authenticated\": true\n}", "encoding": null}, "headers": {"content-length": ["45"], "server": ["gunicorn/0.17.4"], "connection": ["keep-alive"], "date": ["Sat, 03 May 2014 17:23:07 GMT"], "access-control-allow-origin": ["*"], "content-type": ["application/json"]}, "status": {"message": "OK", "code": 200}, "url": "http://httpbin.org/digest-auth/auth/user/passwd"}, "recorded_at": "2014-05-03T17:23:07"}], "recorded_with": "betamax/{version}"}

View File

@ -0,0 +1 @@
{"http_interactions": [{"request": {"body": {"string": "", "encoding": "utf-8"}, "headers": {"Accept-Encoding": ["gzip, deflate, compress"], "Accept": ["*/*"], "User-Agent": ["python-requests/2.2.1 CPython/2.7.6 Linux/3.14.1-1-ARCH"]}, "method": "GET", "uri": "http://httpbin.org/get?a=1"}, "response": {"body": {"string": "{\n \"args\": {\n \"a\": \"1\"\n },\n \"url\": \"http://httpbin.org/get?a=1\",\n \"headers\": {\n \"Connection\": \"close\",\n \"Host\": \"httpbin.org\",\n \"Accept-Encoding\": \"gzip, deflate, compress\",\n \"X-Request-Id\": \"f9f71f12-5705-4a0f-85d4-3d63f9140b1f\",\n \"User-Agent\": \"python-requests/2.2.1 CPython/2.7.6 Linux/3.14.1-1-ARCH\",\n \"Accept\": \"*/*\"\n },\n \"origin\": \"62.47.252.115\"\n}", "encoding": null}, "headers": {"content-length": ["381"], "server": ["gunicorn/0.17.4"], "connection": ["keep-alive"], "date": ["Sat, 03 May 2014 17:23:07 GMT"], "access-control-allow-origin": ["*"], "content-type": ["application/json"]}, "status": {"message": "OK", "code": 200}, "url": "http://httpbin.org/get?a=1"}, "recorded_at": "2014-05-03T17:23:07"}], "recorded_with": "betamax/{version}"}

View File

@ -0,0 +1 @@
{"http_interactions": [{"request": {"body": "", "headers": {"Accept-Encoding": "gzip, deflate, compress", "Accept": "*/*", "User-Agent": "python-requests/2.1.0 CPython/2.7.3 Linux/3.2.29"}, "method": "GET", "uri": "https://klevas.vu.lt/"}, "response": {"body": {"string": "<html>\n<title>\nKlevas\n</title>\n<head>\n<script language=\"javascript\" type=\"text/javascript\">\n <!--\n window.location=\"https://klevas.vu.lt/pls/klevas/logon\";\n // -->\n </script>\n</head>\n</html>\n\n", "encoding": "ISO-8859-1"}, "headers": {"content-length": "204", "accept-ranges": "bytes", "server": "Oracle-Application-Server-10g/10.1.3.1.0 Oracle-HTTP-Server", "last-modified": "Wed, 13 Apr 2011 05:00:23 GMT", "etag": "\"7f9b-cc-4da52de7\"", "date": "Sun, 05 Jan 2014 01:35:40 GMT", "content-type": "text/html"}, "url": "https://klevas.vu.lt/", "status_code": 200}, "recorded_at": "2014-01-05T01:34:40"}], "recorded_with": "betamax"}

View File

@ -0,0 +1 @@
{"recorded_with": "betamax/0.5.1", "http_interactions": [{"recorded_at": "2015-11-14T22:53:20", "request": {"uri": "https://httpbin.org/redirect/5", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": "keep-alive", "Accept": "*/*", "User-Agent": "python-requests/2.8.1", "Accept-Encoding": "gzip, deflate"}}, "response": {"url": "https://httpbin.org/redirect/5", "status": {"code": 302, "message": "FOUND"}, "body": {"string": "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 3.2 Final//EN\">\n<title>Redirecting...</title>\n<h1>Redirecting...</h1>\n<p>You should be redirected automatically to target URL: <a href=\"/relative-redirect/4\">/relative-redirect/4</a>. If not click the link.", "encoding": "utf-8"}, "headers": {"Location": "/relative-redirect/4", "Access-Control-Allow-Credentials": "true", "Server": "nginx", "Date": "Sat, 14 Nov 2015 22:53:18 GMT", "Content-Length": "247", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8"}}}, {"recorded_at": "2015-11-14T22:53:20", "request": {"uri": "https://httpbin.org/relative-redirect/4", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": "keep-alive", "Accept": "*/*", "User-Agent": "python-requests/2.8.1", "Accept-Encoding": "gzip, deflate"}}, "response": {"url": "https://httpbin.org/relative-redirect/4", "status": {"code": 302, "message": "FOUND"}, "body": {"string": "", "encoding": "utf-8"}, "headers": {"Location": "/relative-redirect/3", "Access-Control-Allow-Credentials": "true", "Server": "nginx", "Date": "Sat, 14 Nov 2015 22:53:18 GMT", "Content-Length": "0", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8"}}}, {"recorded_at": "2015-11-14T22:53:20", "request": {"uri": "https://httpbin.org/relative-redirect/3", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": "keep-alive", "Accept": "*/*", "User-Agent": "python-requests/2.8.1", "Accept-Encoding": "gzip, deflate"}}, "response": {"url": "https://httpbin.org/relative-redirect/3", "status": {"code": 302, "message": "FOUND"}, "body": {"string": "", "encoding": "utf-8"}, "headers": {"Location": "/relative-redirect/2", "Access-Control-Allow-Credentials": "true", "Server": "nginx", "Date": "Sat, 14 Nov 2015 22:53:18 GMT", "Content-Length": "0", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8"}}}, {"recorded_at": "2015-11-14T22:53:20", "request": {"uri": "https://httpbin.org/relative-redirect/2", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": "keep-alive", "Accept": "*/*", "User-Agent": "python-requests/2.8.1", "Accept-Encoding": "gzip, deflate"}}, "response": {"url": "https://httpbin.org/relative-redirect/2", "status": {"code": 302, "message": "FOUND"}, "body": {"string": "", "encoding": "utf-8"}, "headers": {"Location": "/relative-redirect/1", "Access-Control-Allow-Credentials": "true", "Server": "nginx", "Date": "Sat, 14 Nov 2015 22:53:18 GMT", "Content-Length": "0", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8"}}}, {"recorded_at": "2015-11-14T22:53:20", "request": {"uri": "https://httpbin.org/relative-redirect/1", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": "keep-alive", "Accept": "*/*", "User-Agent": "python-requests/2.8.1", "Accept-Encoding": "gzip, deflate"}}, "response": {"url": "https://httpbin.org/relative-redirect/1", "status": {"code": 302, "message": "FOUND"}, "body": {"string": "", "encoding": "utf-8"}, "headers": {"Location": "/get", "Access-Control-Allow-Credentials": "true", "Server": "nginx", "Date": "Sat, 14 Nov 2015 22:53:18 GMT", "Content-Length": "0", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8"}}}, {"recorded_at": "2015-11-14T22:53:20", "request": {"uri": "https://httpbin.org/get", "method": "GET", "body": {"string": "", "encoding": "utf-8"}, "headers": {"Connection": "keep-alive", "Accept": "*/*", "User-Agent": "python-requests/2.8.1", "Accept-Encoding": "gzip, deflate"}}, "response": {"url": "https://httpbin.org/get", "status": {"code": 200, "message": "OK"}, "body": {"string": "{\n \"args\": {}, \n \"headers\": {\n \"Accept\": \"*/*\", \n \"Accept-Encoding\": \"gzip, deflate\", \n \"Host\": \"httpbin.org\", \n \"User-Agent\": \"python-requests/2.8.1\"\n }, \n \"origin\": \"<IPADDR>\", \n \"url\": \"https://httpbin.org/get\"\n}\n", "encoding": null}, "headers": {"Access-Control-Allow-Credentials": "true", "Server": "nginx", "Date": "Sat, 14 Nov 2015 22:53:18 GMT", "Content-Length": "239", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Content-Type": "application/json"}}}]}

View File

@ -0,0 +1 @@
{"recorded_with": "betamax/0.5.1", "http_interactions": [{"request": {"body": {"encoding": "utf-8", "string": ""}, "uri": "https://httpbin.org/get", "headers": {"Connection": ["keep-alive"], "User-Agent": ["python-requests/2.8.1"], "Accept": ["*/*"], "Accept-Encoding": ["gzip, deflate"]}, "method": "GET"}, "recorded_at": "2015-11-14T22:33:32", "response": {"status": {"code": 200, "message": "OK"}, "url": "https://httpbin.org/get", "body": {"encoding": null, "string": "{\n \"args\": {}, \n \"headers\": {\n \"Accept\": \"*/*\", \n \"Accept-Encoding\": \"gzip, deflate\", \n \"Host\": \"httpbin.org\", \n \"User-Agent\": \"python-requests/2.8.1\"\n }, \n \"origin\": \"<IPADDR>\", \n \"url\": \"https://httpbin.org/get\"\n}\n"}, "headers": {"Content-Type": ["application/json"], "Date": ["Sat, 14 Nov 2015 22:33:30 GMT"], "Connection": ["keep-alive"], "Server": ["nginx"], "Access-Control-Allow-Credentials": ["true"], "Content-Length": ["239"], "Access-Control-Allow-Origin": ["*"]}}}]}

File diff suppressed because one or more lines are too long

15
tests/conftest.py Normal file
View File

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
import os
import sys
import betamax
sys.path.insert(0, '.')
placeholders = {
'<IPADDR>': os.environ.get('IPADDR', '127.0.0.1'),
}
with betamax.Betamax.configure() as config:
for placeholder, value in placeholders.items():
config.define_cassette_placeholder(placeholder, value)

View File

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
"""Tests for the AppEngineAdapter."""
import sys
import mock
import pytest
import requests
from requests_toolbelt import exceptions as exc
REQUESTS_SUPPORTS_GAE = requests.__build__ >= 0x021000
if REQUESTS_SUPPORTS_GAE:
from requests.packages.urllib3.contrib import appengine as urllib3_appeng
from requests_toolbelt.adapters import appengine
else:
appengine = urllib3_appeng = None
@pytest.mark.skipif(sys.version_info >= (3,),
reason="App Engine doesn't support Python 3 (yet) and "
"urllib3's appengine contrib code is Python 2 "
"only. Until the latter changes, this test will "
"be skipped, unfortunately.")
@pytest.mark.skipif(not REQUESTS_SUPPORTS_GAE,
reason="Requires Requests v2.10.0 or later")
@mock.patch.object(urllib3_appeng, 'urlfetch')
def test_get(mock_urlfetch):
"""Tests a simple requests.get() call.
App Engine urlfetch docs:
https://cloud.google.com/appengine/docs/python/refdocs/google.appengine.api.urlfetch
"""
response = mock.Mock(status_code=200, content='asdf', headers={})
mock_urlfetch.fetch = mock.Mock(return_value=response)
session = requests.Session()
session.mount('http://', appengine.AppEngineAdapter())
resp = session.get('http://url/', timeout=9, headers={'Foo': 'bar'})
assert resp.status_code == 200
assert resp.content == 'asdf'
args, kwargs = mock_urlfetch.fetch.call_args
assert args == ('http://url/',)
assert kwargs['deadline'] == 9
assert kwargs['headers']['Foo'] == 'bar'
@pytest.mark.skipif(sys.version_info >= (3,),
reason="App Engine doesn't support Python 3 (yet) and "
"urllib3's appengine contrib code is Python 2 "
"only. Until the latter changes, this test will "
"be skipped, unfortunately.")
@pytest.mark.skipif(not REQUESTS_SUPPORTS_GAE,
reason="Requires Requests v2.10.0 or later")
def test_appengine_monkeypatch():
"""Tests monkeypatching Requests adapters for AppEngine compatibility.
"""
adapter = requests.sessions.HTTPAdapter
appengine.monkeypatch()
assert requests.sessions.HTTPAdapter == appengine.AppEngineAdapter
assert requests.adapters.HTTPAdapter == appengine.AppEngineAdapter
appengine.monkeypatch(validate_certificate=False)
assert requests.sessions.HTTPAdapter == appengine.InsecureAppEngineAdapter
assert requests.adapters.HTTPAdapter == appengine.InsecureAppEngineAdapter
requests.sessions.HTTPAdapter = adapter
requests.adapters.HTTPAdapter = adapter
@pytest.mark.skipif(sys.version_info >= (3,),
reason="App Engine doesn't support Python 3 (yet) and "
"urllib3's appengine contrib code is Python 2 "
"only. Until the latter changes, this test will "
"be skipped, unfortunately.")
@pytest.mark.skipif(not REQUESTS_SUPPORTS_GAE,
reason="Requires Requests v2.10.0 or later")
@mock.patch.object(urllib3_appeng, 'urlfetch')
def test_insecure_appengine_adapter(mock_urlfetch):
adapter = appengine.InsecureAppEngineAdapter()
assert not adapter._validate_certificate
with pytest.warns(exc.IgnoringGAECertificateValidation):
adapter = appengine.InsecureAppEngineAdapter(validate_certificate=True)

77
tests/test_auth.py Normal file
View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
import requests
import unittest
import mock
from requests_toolbelt.auth.guess import GuessAuth, GuessProxyAuth
from . import get_betamax
class TestGuessAuth(unittest.TestCase):
def setUp(self):
self.session = requests.Session()
self.recorder = get_betamax(self.session)
def cassette(self, name):
return self.recorder.use_cassette(
'httpbin_guess_auth_' + name,
match_requests_on=['method', 'uri', 'digest-auth']
)
def test_basic(self):
with self.cassette('basic'):
r = self.session.request(
'GET', 'http://httpbin.org/basic-auth/user/passwd',
auth=GuessAuth('user', 'passwd'))
assert r.json() == {'authenticated': True, 'user': 'user'}
def test_digest(self):
with self.cassette('digest'):
r = self.session.request(
'GET', 'http://httpbin.org/digest-auth/auth/user/passwd',
auth=GuessAuth('user', 'passwd'))
assert r.json() == {'authenticated': True, 'user': 'user'}
def test_no_auth(self):
with self.cassette('none'):
url = 'http://httpbin.org/get?a=1'
r = self.session.request('GET', url,
auth=GuessAuth('user', 'passwd'))
j = r.json()
assert j['args'] == {'a': '1'}
assert j['url'] == url
assert 'user' not in r.text
assert 'passwd' not in r.text
class TestGuessProxyAuth(unittest.TestCase):
@mock.patch('requests_toolbelt.auth.http_proxy_digest.HTTPProxyDigestAuth.handle_407')
def test_handle_407_header_digest(self, mock_handle_407):
r = requests.Response()
r.headers['Proxy-Authenticate'] = 'Digest nonce="d2b19757d3d656a283c99762cbd1097b", opaque="1c311ad1cc6e6183b83bc75f95a57893", realm="me@kennethreitz.com", qop=auth'
guess_auth = GuessProxyAuth(None, None, "user", "passwd")
guess_auth.handle_407(r)
mock_handle_407.assert_called_with(r)
@mock.patch('requests.auth.HTTPProxyAuth.__call__')
@mock.patch('requests.cookies.extract_cookies_to_jar')
def test_handle_407_header_basic(self, extract_cookies_to_jar, proxy_auth_call):
req = mock.Mock()
r = mock.Mock()
r.headers = dict()
r.request.copy.return_value = req
proxy_auth_call.return_value = requests.Response()
kwargs = {}
r.headers['Proxy-Authenticate'] = 'Basic realm="Fake Realm"'
guess_auth = GuessProxyAuth(None, None, "user", "passwd")
guess_auth.handle_407(r, *kwargs)
proxy_auth_call.assert_called_with(req)

View File

@ -0,0 +1,58 @@
import requests
from requests.auth import HTTPBasicAuth
from requests_toolbelt.auth.handler import AuthHandler
from requests_toolbelt.auth.handler import NullAuthStrategy
def test_turns_tuples_into_basic_auth():
a = AuthHandler({'http://example.com': ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com')
assert not isinstance(strategy, NullAuthStrategy)
assert isinstance(strategy, HTTPBasicAuth)
def test_uses_null_strategy_for_non_matching_domains():
a = AuthHandler({'http://api.example.com': ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com')
assert isinstance(strategy, NullAuthStrategy)
def test_normalizes_domain_keys():
a = AuthHandler({'https://API.github.COM': ('foo', 'bar')})
assert 'https://api.github.com' in a.strategies
assert 'https://API.github.COM' not in a.strategies
def test_can_add_new_strategies():
a = AuthHandler({'https://example.com': ('foo', 'bar')})
a.add_strategy('https://api.github.com', ('fiz', 'baz'))
assert isinstance(
a.get_strategy_for('https://api.github.com'),
HTTPBasicAuth
)
def test_prepares_auth_correctly():
# Set up our Session and AuthHandler
auth = AuthHandler({
'https://api.example.com': ('bar', 'baz'),
'https://httpbin.org': ('biz', 'fiz'),
})
s = requests.Session()
s.auth = auth
# Set up a valid GET request to https://api.example.com/users
r1 = requests.Request('GET', 'https://api.example.com/users')
p1 = s.prepare_request(r1)
assert p1.headers['Authorization'] == 'Basic YmFyOmJheg=='
# Set up a valid POST request to https://httpbin.org/post
r2 = requests.Request('POST', 'https://httpbin.org/post', data='foo')
p2 = s.prepare_request(r2)
assert p2.headers['Authorization'] == 'Basic Yml6OmZpeg=='
# Set up an *invalid* OPTIONS request to http://api.example.com
# NOTE(sigmavirus24): This is not because of the verb but instead because
# it is the wrong URI scheme.
r3 = requests.Request('OPTIONS', 'http://api.example.com/projects')
p3 = s.prepare_request(r3)
assert p3.headers.get('Authorization') is None

220
tests/test_downloadutils.py Normal file
View File

@ -0,0 +1,220 @@
"""Tests for the utils module."""
import io
import os
import os.path
import shutil
import tempfile
import requests
from requests_toolbelt.downloadutils import stream
from requests_toolbelt.downloadutils import tee
import mock
import pytest
from . import get_betamax
preserve_bytes = {'preserve_exact_body_bytes': True}
def test_get_download_file_path_uses_content_disposition():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'})
path = stream.get_download_file_path(r, None)
r.close()
assert path == filename
def test_get_download_file_path_directory():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'})
path = stream.get_download_file_path(r, tempfile.tempdir)
r.close()
assert path == os.path.join(tempfile.tempdir, filename)
def test_get_download_file_path_specific_file():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'})
path = stream.get_download_file_path(r, '/arbitrary/file.path')
r.close()
assert path == '/arbitrary/file.path'
def test_stream_response_to_file_uses_content_disposition():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r)
assert os.path.exists(filename)
os.unlink(filename)
def test_stream_response_to_specific_filename():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=filename)
assert os.path.exists(filename)
os.unlink(filename)
def test_stream_response_to_directory():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
td = tempfile.mkdtemp()
try:
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
expected_path = os.path.join(td, filename)
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=td)
assert os.path.exists(expected_path)
finally:
shutil.rmtree(td)
def test_stream_response_to_existing_file():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py.whl'
with open(filename, 'w') as f_existing:
f_existing.write('test')
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
try:
stream.stream_response_to_file(r, path=filename)
except stream.exc.StreamingError as e:
assert str(e).startswith('File already exists:')
else:
assert False, "Should have raised a FileExistsError"
finally:
os.unlink(filename)
def test_stream_response_to_file_like_object():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
file_obj = io.BytesIO()
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=file_obj)
assert 0 < file_obj.tell()
def test_stream_response_to_file_chunksize():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
class FileWrapper(io.BytesIO):
def __init__(self):
super(FileWrapper, self).__init__()
self.chunk_sizes = []
def write(self, data):
self.chunk_sizes.append(len(data))
return super(FileWrapper, self).write(data)
file_obj = FileWrapper()
chunksize = 1231
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=file_obj, chunksize=chunksize)
assert 0 < file_obj.tell()
assert len(file_obj.chunk_sizes) >= 1
assert file_obj.chunk_sizes[0] == chunksize
@pytest.fixture
def streamed_response(chunks=None):
chunks = chunks or [b'chunk'] * 8
response = mock.MagicMock()
response.raw.stream.return_value = chunks
return response
def test_tee(streamed_response):
response = streamed_response
expected_len = len('chunk') * 8
fileobject = io.BytesIO()
assert expected_len == sum(len(c) for c in tee.tee(response, fileobject))
assert fileobject.getvalue() == b'chunkchunkchunkchunkchunkchunkchunkchunk'
def test_tee_rejects_StringIO():
fileobject = io.StringIO()
with pytest.raises(TypeError):
# The generator needs to be iterated over before the exception will be
# raised
sum(len(c) for c in tee.tee(None, fileobject))
def test_tee_to_file(streamed_response):
response = streamed_response
expected_len = len('chunk') * 8
assert expected_len == sum(
len(c) for c in tee.tee_to_file(response, 'tee.txt')
)
assert os.path.exists('tee.txt')
os.remove('tee.txt')
def test_tee_to_bytearray(streamed_response):
response = streamed_response
arr = bytearray()
expected_arr = bytearray(b'chunk' * 8)
expected_len = len(expected_arr)
assert expected_len == sum(
len(c) for c in tee.tee_to_bytearray(response, arr)
)
assert expected_arr == arr
def test_tee_to_bytearray_only_accepts_bytearrays():
with pytest.raises(TypeError):
tee.tee_to_bytearray(None, object())

382
tests/test_dump.py Normal file
View File

@ -0,0 +1,382 @@
"""Collection of tests for utils.dump.
The dump utility module only has two public attributes:
- dump_response
- dump_all
This module, however, tests many of the private implementation details since
those public functions just wrap them and testing the public functions will be
very complex and high-level.
"""
from requests_toolbelt._compat import HTTPHeaderDict
from requests_toolbelt.utils import dump
import mock
import pytest
import requests
from . import get_betamax
HTTP_1_1 = 11
HTTP_1_0 = 10
HTTP_0_9 = 9
HTTP_UNKNOWN = 5000
class TestSimplePrivateFunctions(object):
"""Excercise simple private functions in one logical place."""
def test_coerce_to_bytes_skips_byte_strings(self):
"""Show that _coerce_to_bytes skips bytes input."""
bytestr = b'some bytes'
assert dump._coerce_to_bytes(bytestr) is bytestr
def test_coerce_to_bytes_converts_text(self):
"""Show that _coerce_to_bytes handles text input."""
bytestr = b'some bytes'
text = bytestr.decode('utf-8')
assert dump._coerce_to_bytes(text) == bytestr
def test_format_header(self):
"""Prove that _format_header correctly formats bytes input."""
header = b'Connection'
value = b'close'
expected = b'Connection: close\r\n'
assert dump._format_header(header, value) == expected
def test_format_header_handles_unicode(self):
"""Prove that _format_header correctly formats text input."""
header = b'Connection'.decode('utf-8')
value = b'close'.decode('utf-8')
expected = b'Connection: close\r\n'
assert dump._format_header(header, value) == expected
def test_build_request_path(self):
"""Show we get the right request path for a normal request."""
path, _ = dump._build_request_path(
'https://example.com/foo/bar', {}
)
assert path == b'/foo/bar'
def test_build_request_path_with_query_string(self):
"""Show we include query strings appropriately."""
path, _ = dump._build_request_path(
'https://example.com/foo/bar?query=data', {}
)
assert path == b'/foo/bar?query=data'
def test_build_request_path_with_proxy_info(self):
"""Show that we defer to the proxy request_path info."""
path, _ = dump._build_request_path(
'https://example.com/', {
'request_path': b'https://example.com/test'
}
)
assert path == b'https://example.com/test'
class RequestResponseMixin(object):
"""Mix-in for test classes needing mocked requests and responses."""
response_spec = [
'connection',
'content',
'raw',
'reason',
'request',
'url',
]
request_spec = [
'body',
'headers',
'method',
'url',
]
httpresponse_spec = [
'headers',
'reason',
'status',
'version',
]
adapter_spec = [
'proxy_manager',
]
@pytest.fixture(autouse=True)
def set_up(self):
"""xUnit style autoused fixture creating mocks."""
self.response = mock.Mock(spec=self.response_spec)
self.request = mock.Mock(spec=self.request_spec)
self.httpresponse = mock.Mock(spec=self.httpresponse_spec)
self.adapter = mock.Mock(spec=self.adapter_spec)
self.response.connection = self.adapter
self.response.request = self.request
self.response.raw = self.httpresponse
def configure_response(self, content=b'', proxy_manager=None, url=None,
reason=b''):
"""Helper function to configure a mocked response."""
self.adapter.proxy_manager = proxy_manager or {}
self.response.content = content
self.response.url = url
self.response.reason = reason
def configure_request(self, body=b'', headers=None, method=None,
url=None):
"""Helper function to configure a mocked request."""
self.request.body = body
self.request.headers = headers or {}
self.request.method = method
self.request.url = url
def configure_httpresponse(self, headers=None, reason=b'', status=200,
version=HTTP_1_1):
"""Helper function to configure a mocked urllib3 response."""
self.httpresponse.headers = HTTPHeaderDict(headers or {})
self.httpresponse.reason = reason
self.httpresponse.status = status
self.httpresponse.version = version
class TestResponsePrivateFunctions(RequestResponseMixin):
"""Excercise private functions using responses."""
def test_get_proxy_information_sans_proxy(self):
"""Show no information is returned when not using a proxy."""
self.configure_response()
assert dump._get_proxy_information(self.response) is None
def test_get_proxy_information_with_proxy_over_http(self):
"""Show only the request path is returned for HTTP requests.
Using HTTP over a proxy doesn't alter anything except the request path
of the request. The method doesn't change a dictionary with the
request_path is the only thing that should be returned.
"""
self.configure_response(
proxy_manager={'http://': 'http://local.proxy:3939'},
)
self.configure_request(
url='http://example.com',
method='GET',
)
assert dump._get_proxy_information(self.response) == {
'request_path': 'http://example.com'
}
def test_get_proxy_information_with_proxy_over_https(self):
"""Show that the request path and method are returned for HTTPS reqs.
Using HTTPS over a proxy changes the method used and the request path.
"""
self.configure_response(
proxy_manager={'http://': 'http://local.proxy:3939'},
)
self.configure_request(
url='https://example.com',
method='GET',
)
assert dump._get_proxy_information(self.response) == {
'method': 'CONNECT',
'request_path': 'https://example.com'
}
def test_dump_request_data(self):
"""Build up the request data into a bytearray."""
self.configure_request(
url='http://example.com/',
method='GET',
)
array = bytearray()
prefixes = dump.PrefixSettings('request:', 'response:')
dump._dump_request_data(
request=self.request,
prefixes=prefixes,
bytearr=array,
proxy_info={},
)
assert b'request:GET / HTTP/1.1\r\n' in array
assert b'request:Host: example.com\r\n' in array
def test_dump_request_data_with_proxy_info(self):
"""Build up the request data into a bytearray."""
self.configure_request(
url='http://example.com/',
method='GET',
)
array = bytearray()
prefixes = dump.PrefixSettings('request:', 'response:')
dump._dump_request_data(
request=self.request,
prefixes=prefixes,
bytearr=array,
proxy_info={
'request_path': b'fake-request-path',
'method': b'CONNECT',
},
)
assert b'request:CONNECT fake-request-path HTTP/1.1\r\n' in array
assert b'request:Host: example.com\r\n' in array
def test_dump_response_data(self):
"""Build up the response data into a bytearray."""
self.configure_response(
url='https://example.com/redirected',
content=b'foobarbogus',
reason=b'OK',
)
self.configure_httpresponse(
headers={'Content-Type': 'application/json'},
reason=b'OK',
status=201,
)
array = bytearray()
prefixes = dump.PrefixSettings('request:', 'response:')
dump._dump_response_data(
response=self.response,
prefixes=prefixes,
bytearr=array,
)
assert b'response:HTTP/1.1 201 OK\r\n' in array
assert b'response:Content-Type: application/json\r\n' in array
def test_dump_response_data_with_older_http_version(self):
"""Build up the response data into a bytearray."""
self.configure_response(
url='https://example.com/redirected',
content=b'foobarbogus',
reason=b'OK',
)
self.configure_httpresponse(
headers={'Content-Type': 'application/json'},
reason=b'OK',
status=201,
version=HTTP_0_9,
)
array = bytearray()
prefixes = dump.PrefixSettings('request:', 'response:')
dump._dump_response_data(
response=self.response,
prefixes=prefixes,
bytearr=array,
)
assert b'response:HTTP/0.9 201 OK\r\n' in array
assert b'response:Content-Type: application/json\r\n' in array
def test_dump_response_data_with_unknown_http_version(self):
"""Build up the response data into a bytearray."""
self.configure_response(
url='https://example.com/redirected',
content=b'foobarbogus',
reason=b'OK',
)
self.configure_httpresponse(
headers={'Content-Type': 'application/json'},
reason=b'OK',
status=201,
version=HTTP_UNKNOWN,
)
array = bytearray()
prefixes = dump.PrefixSettings('request:', 'response:')
dump._dump_response_data(
response=self.response,
prefixes=prefixes,
bytearr=array,
)
assert b'response:HTTP/? 201 OK\r\n' in array
assert b'response:Content-Type: application/json\r\n' in array
class TestResponsePublicFunctions(RequestResponseMixin):
"""Excercise public functions using responses."""
def test_dump_response_fails_without_request(self):
"""Show that a response without a request raises a ValueError."""
del self.response.request
assert hasattr(self.response, 'request') is False
with pytest.raises(ValueError):
dump.dump_response(self.response)
def test_dump_response_uses_provided_bytearray(self):
"""Show that users providing bytearrays receive those back."""
self.configure_request(
url='http://example.com/',
method='GET',
)
self.configure_response(
url='https://example.com/redirected',
content=b'foobarbogus',
reason=b'OK',
)
self.configure_httpresponse(
headers={'Content-Type': 'application/json'},
reason=b'OK',
status=201,
)
arr = bytearray()
retarr = dump.dump_response(self.response, data_array=arr)
assert retarr is arr
class TestDumpRealResponses(object):
"""Exercise dump utilities against real data."""
def test_dump_response(self):
session = requests.Session()
recorder = get_betamax(session)
with recorder.use_cassette('simple_get_request'):
response = session.get('https://httpbin.org/get')
arr = dump.dump_response(response)
assert b'< GET /get HTTP/1.1\r\n' in arr
assert b'< Host: httpbin.org\r\n' in arr
# NOTE(sigmavirus24): The ? below is only because Betamax doesn't
# preserve which HTTP version the server reports as supporting.
# When not using Betamax, there should be a different version
# reported.
assert b'> HTTP/? 200 OK\r\n' in arr
assert b'> Content-Type: application/json\r\n' in arr
def test_dump_all(self):
session = requests.Session()
recorder = get_betamax(session)
with recorder.use_cassette('redirect_request_for_dump_all'):
response = session.get('https://httpbin.org/redirect/5')
arr = dump.dump_all(response)
assert b'< GET /redirect/5 HTTP/1.1\r\n' in arr
assert b'> Location: /relative-redirect/4\r\n' in arr
assert b'< GET /relative-redirect/4 HTTP/1.1\r\n' in arr
assert b'> Location: /relative-redirect/3\r\n' in arr
assert b'< GET /relative-redirect/3 HTTP/1.1\r\n' in arr
assert b'> Location: /relative-redirect/2\r\n' in arr
assert b'< GET /relative-redirect/2 HTTP/1.1\r\n' in arr
assert b'> Location: /relative-redirect/1\r\n' in arr
assert b'< GET /relative-redirect/1 HTTP/1.1\r\n' in arr
assert b'> Location: /get\r\n' in arr
assert b'< GET /get HTTP/1.1\r\n' in arr

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
import requests
import unittest
from requests_toolbelt.adapters.fingerprint import FingerprintAdapter
from . import get_betamax
class TestFingerprintAdapter(unittest.TestCase):
HTTP2BIN_FINGERPRINT = 'abf8683eeba8521ad2e8dc48e92a1cbea3ff8608f1417948fdad75d7b50eb264'
def setUp(self):
self.session = requests.Session()
self.session.mount('https://http2bin.org', FingerprintAdapter(self.HTTP2BIN_FINGERPRINT))
self.recorder = get_betamax(self.session)
def test_fingerprint(self):
with self.recorder.use_cassette('http2bin_fingerprint'):
r = self.session.get('https://http2bin.org/get')
assert r.status_code == 200

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
import requests
import unittest
from requests_toolbelt.cookies.forgetful import ForgetfulCookieJar
from . import get_betamax
class TestForgetfulCookieJar(unittest.TestCase):
def setUp(self):
self.session = requests.Session()
self.session.cookies = ForgetfulCookieJar()
self.recorder = get_betamax(self.session)
def test_cookies_are_ignored(self):
with self.recorder.use_cassette('http2bin_cookies'):
url = 'https://httpbin.org/cookies/set'
cookies = {
'cookie0': 'value0',
}
r = self.session.request(
'GET', url,
params=cookies
)
assert 'cookie0' not in self.session.cookies

76
tests/test_formdata.py Normal file
View File

@ -0,0 +1,76 @@
"""Test module for requests_toolbelt.utils.formdata."""
try:
from urllib.parse import parse_qs
except ImportError:
from urlparse import parse_qs
from requests_toolbelt.utils.formdata import urlencode
import pytest
dict_query = {
'first_nested': {
'second_nested': {
'third_nested': {
'fourth0': 'fourth_value0',
'fourth1': 'fourth_value1',
},
'third0': 'third_value0',
},
'second0': 'second_value0',
},
'outter': 'outter_value',
}
list_query = [
('first_nested', [
('second_nested', [
('third_nested', [
('fourth0', 'fourth_value0'),
('fourth1', 'fourth_value1'),
]),
('third0', 'third_value0'),
]),
('second0', 'second_value0'),
]),
('outter', 'outter_value'),
]
mixed_dict_query = {
'first_nested': {
'second_nested': [
('third_nested', {
'fourth0': 'fourth_value0',
'fourth1': 'fourth_value1',
}),
('third0', 'third_value0'),
],
'second0': 'second_value0',
},
'outter': 'outter_value',
}
expected_parsed_query = {
'first_nested[second0]': ['second_value0'],
'first_nested[second_nested][third0]': ['third_value0'],
'first_nested[second_nested][third_nested][fourth0]': ['fourth_value0'],
'first_nested[second_nested][third_nested][fourth1]': ['fourth_value1'],
'outter': ['outter_value'],
}
@pytest.mark.parametrize("query", [dict_query, list_query, mixed_dict_query])
def test_urlencode_flattens_nested_structures(query):
"""Show that when parsed, the structure is conveniently flat."""
parsed = parse_qs(urlencode(query))
assert parsed == expected_parsed_query
def test_urlencode_catches_invalid_input():
"""Show that queries are loosely validated."""
with pytest.raises(ValueError):
urlencode(['fo'])
with pytest.raises(ValueError):
urlencode([('foo', 'bar', 'bogus')])

View File

@ -0,0 +1,48 @@
import pytest
import requests
from requests_toolbelt.adapters import host_header_ssl as hhssl
@pytest.fixture
def session():
"""Create a session with our adapter mounted."""
session = requests.Session()
session.mount('https://', hhssl.HostHeaderSSLAdapter())
@pytest.mark.skip
class TestHostHeaderSSLAdapter(object):
"""Tests for our HostHeaderSNIAdapter."""
def test_ssladapter(self, session):
# normal mode
r = session.get('https://example.org')
assert r.status_code == 200
# accessing IP address directly
r = session.get('https://93.184.216.34',
headers={"Host": "example.org"})
assert r.status_code == 200
# vHost
r = session.get('https://93.184.216.34',
headers={'Host': 'example.com'})
assert r.status_code == 200
def test_stream(self):
self.session.get('https://54.175.219.8/stream/20',
headers={'Host': 'httpbin.org'},
stream=True)
def test_case_insensitive_header(self):
r = self.session.get('https://93.184.216.34',
headers={'hOSt': 'example.org'})
assert r.status_code == 200
def test_plain_requests(self):
# test whether the reason for this adapter remains
# (may be implemented into requests in the future)
with pytest.raises(requests.exceptions.SSLError):
requests.get(url='https://93.184.216.34',
headers={'Host': 'example.org'})

View File

@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
import io
import sys
import unittest
import mock
import pytest
import requests
from requests_toolbelt.multipart.decoder import BodyPart
from requests_toolbelt.multipart.decoder import (
ImproperBodyPartContentException
)
from requests_toolbelt.multipart.decoder import MultipartDecoder
from requests_toolbelt.multipart.decoder import (
NonMultipartContentTypeException
)
from requests_toolbelt.multipart.encoder import encode_with
from requests_toolbelt.multipart.encoder import MultipartEncoder
class TestBodyPart(unittest.TestCase):
@staticmethod
def u(content):
major = sys.version_info[0]
if major == 3:
return content
else:
return unicode(content.replace(r'\\', r'\\\\'), 'unicode_escape')
@staticmethod
def bodypart_bytes_from_headers_and_values(headers, value, encoding):
return b'\r\n\r\n'.join(
[
b'\r\n'.join(
[
b': '.join([encode_with(i, encoding) for i in h])
for h in headers
]
),
encode_with(value, encoding)
]
)
def setUp(self):
self.header_1 = (TestBodyPart.u('Snowman'), TestBodyPart.u(''))
self.value_1 = TestBodyPart.u('©')
self.part_1 = BodyPart(
TestBodyPart.bodypart_bytes_from_headers_and_values(
(self.header_1,), self.value_1, 'utf-8'
),
'utf-8'
)
self.part_2 = BodyPart(
TestBodyPart.bodypart_bytes_from_headers_and_values(
[], self.value_1, 'utf-16'
),
'utf-16'
)
def test_equality_content_should_be_equal(self):
part_3 = BodyPart(
TestBodyPart.bodypart_bytes_from_headers_and_values(
[], self.value_1, 'utf-8'
),
'utf-8'
)
assert self.part_1.content == part_3.content
def test_equality_content_equals_bytes(self):
assert self.part_1.content == encode_with(self.value_1, 'utf-8')
def test_equality_content_should_not_be_equal(self):
assert self.part_1.content != self.part_2.content
def test_equality_content_does_not_equal_bytes(self):
assert self.part_1.content != encode_with(self.value_1, 'latin-1')
def test_changing_encoding_changes_text(self):
part_2_orig_text = self.part_2.text
self.part_2.encoding = 'latin-1'
assert self.part_2.text != part_2_orig_text
def test_text_should_be_equal(self):
assert self.part_1.text == self.part_2.text
def test_no_headers(self):
sample_1 = b'\r\n\r\nNo headers\r\nTwo lines'
part_3 = BodyPart(sample_1, 'utf-8')
assert len(part_3.headers) == 0
assert part_3.content == b'No headers\r\nTwo lines'
def test_no_crlf_crlf_in_content(self):
content = b'no CRLF CRLF here!\r\n'
with pytest.raises(ImproperBodyPartContentException):
BodyPart(content, 'utf-8')
class TestMultipartDecoder(unittest.TestCase):
def setUp(self):
self.sample_1 = (
('field 1', 'value 1'),
('field 2', 'value 2'),
('field 3', 'value 3'),
('field 4', 'value 4'),
)
self.boundary = 'test boundary'
self.encoded_1 = MultipartEncoder(self.sample_1, self.boundary)
self.decoded_1 = MultipartDecoder(
self.encoded_1.to_string(),
self.encoded_1.content_type
)
def test_non_multipart_response_fails(self):
jpeg_response = mock.NonCallableMagicMock(spec=requests.Response)
jpeg_response.headers = {'content-type': 'image/jpeg'}
with pytest.raises(NonMultipartContentTypeException):
MultipartDecoder.from_response(jpeg_response)
def test_length_of_parts(self):
assert len(self.sample_1) == len(self.decoded_1.parts)
def test_content_of_parts(self):
def parts_equal(part, sample):
return part.content == encode_with(sample[1], 'utf-8')
parts_iter = zip(self.decoded_1.parts, self.sample_1)
assert all(parts_equal(part, sample) for part, sample in parts_iter)
def test_header_of_parts(self):
def parts_header_equal(part, sample):
return part.headers[b'Content-Disposition'] == encode_with(
'form-data; name="{0}"'.format(sample[0]), 'utf-8'
)
parts_iter = zip(self.decoded_1.parts, self.sample_1)
assert all(
parts_header_equal(part, sample)
for part, sample in parts_iter
)
def test_from_response(self):
response = mock.NonCallableMagicMock(spec=requests.Response)
response.headers = {
'content-type': 'multipart/related; boundary="samp1"'
}
cnt = io.BytesIO()
cnt.write(b'\r\n--samp1\r\n')
cnt.write(b'Header-1: Header-Value-1\r\n')
cnt.write(b'Header-2: Header-Value-2\r\n')
cnt.write(b'\r\n')
cnt.write(b'Body 1, Line 1\r\n')
cnt.write(b'Body 1, Line 2\r\n')
cnt.write(b'--samp1\r\n')
cnt.write(b'\r\n')
cnt.write(b'Body 2, Line 1\r\n')
cnt.write(b'--samp1--\r\n')
response.content = cnt.getvalue()
decoder_2 = MultipartDecoder.from_response(response)
assert decoder_2.content_type == response.headers['content-type']
assert (
decoder_2.parts[0].content == b'Body 1, Line 1\r\nBody 1, Line 2'
)
assert decoder_2.parts[0].headers[b'Header-1'] == b'Header-Value-1'
assert len(decoder_2.parts[1].headers) == 0
assert decoder_2.parts[1].content == b'Body 2, Line 1'

View File

@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
import unittest
import io
from requests_toolbelt.multipart.encoder import CustomBytesIO, MultipartEncoder
from requests_toolbelt._compat import filepost
class LargeFileMock(object):
def __init__(self):
# Let's keep track of how many bytes we've given
self.bytes_read = 0
# Our limit (1GB)
self.bytes_max = 1024 * 1024 * 1024
# Fake name
self.name = 'fake_name.py'
# Create a fileno attribute
self.fileno = None
def __len__(self):
return self.bytes_max
def read(self, size=None):
if self.bytes_read >= self.bytes_max:
return b''
if size is None:
length = self.bytes_max - self.bytes_read
else:
length = size
length = int(length)
length = min([length, self.bytes_max - self.bytes_read])
self.bytes_read += length
return b'a' * length
def tell(self):
return self.bytes_read
class TestCustomBytesIO(unittest.TestCase):
def setUp(self):
self.instance = CustomBytesIO()
def test_writable(self):
assert hasattr(self.instance, 'write')
assert self.instance.write(b'example') == 7
def test_readable(self):
assert hasattr(self.instance, 'read')
assert self.instance.read() == b''
assert self.instance.read(10) == b''
def test_can_read_after_writing_to(self):
self.instance.write(b'example text')
self.instance.read() == b'example text'
def test_can_read_some_after_writing_to(self):
self.instance.write(b'example text')
self.instance.read(6) == b'exampl'
def test_can_get_length(self):
self.instance.write(b'example')
self.instance.seek(0, 0)
assert self.instance.len == 7
def test_truncates_intelligently(self):
self.instance.write(b'abcdefghijklmnopqrstuvwxyzabcd') # 30 bytes
assert self.instance.tell() == 30
self.instance.seek(-10, 2)
self.instance.smart_truncate()
assert self.instance.len == 10
assert self.instance.read() == b'uvwxyzabcd'
assert self.instance.tell() == 10
def test_accepts_encoded_strings_with_unicode(self):
"""Accepts a string with encoded unicode characters."""
s = b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3'
self.instance = CustomBytesIO(s)
assert self.instance.read() == s
class TestMultipartEncoder(unittest.TestCase):
def setUp(self):
self.parts = [('field', 'value'), ('other_field', 'other_value')]
self.boundary = 'this-is-a-boundary'
self.instance = MultipartEncoder(self.parts, boundary=self.boundary)
def test_to_string(self):
assert self.instance.to_string() == (
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="field"\r\n\r\n'
'value\r\n'
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="other_field"\r\n\r\n'
'other_value\r\n'
'--this-is-a-boundary--\r\n'
).encode()
def test_content_type(self):
expected = 'multipart/form-data; boundary=this-is-a-boundary'
assert self.instance.content_type == expected
def test_encodes_data_the_same(self):
encoded = filepost.encode_multipart_formdata(self.parts,
self.boundary)[0]
assert encoded == self.instance.read()
def test_streams_its_data(self):
large_file = LargeFileMock()
parts = {'some field': 'value',
'some file': large_file,
}
encoder = MultipartEncoder(parts)
total_size = encoder.len
read_size = 1024 * 1024 * 128
already_read = 0
while True:
read = encoder.read(read_size)
already_read += len(read)
if not read:
break
assert encoder._buffer.tell() <= read_size
assert already_read == total_size
def test_length_is_correct(self):
encoded = filepost.encode_multipart_formdata(self.parts,
self.boundary)[0]
assert len(encoded) == self.instance.len
def test_encodes_with_readable_data(self):
s = io.BytesIO(b'value')
m = MultipartEncoder([('field', s)], boundary=self.boundary)
assert m.read() == (
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="field"\r\n\r\n'
'value\r\n'
'--this-is-a-boundary--\r\n'
).encode()
def test_reads_open_file_objects(self):
with open('setup.py', 'rb') as fd:
m = MultipartEncoder([('field', 'foo'), ('file', fd)])
assert m.read() is not None
def test_reads_open_file_objects_with_a_specified_filename(self):
with open('setup.py', 'rb') as fd:
m = MultipartEncoder(
[('field', 'foo'), ('file', ('filename', fd, 'text/plain'))]
)
assert m.read() is not None
def test_reads_open_file_objects_using_to_string(self):
with open('setup.py', 'rb') as fd:
m = MultipartEncoder([('field', 'foo'), ('file', fd)])
assert m.to_string() is not None
def test_handles_encoded_unicode_strings(self):
m = MultipartEncoder([
('field',
b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3')
])
assert m.read() is not None
def test_handles_uncode_strings(self):
s = b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3'
m = MultipartEncoder([
('field', s.decode('utf-8'))
])
assert m.read() is not None
def test_regresion_1(self):
"""Ensure issue #31 doesn't ever happen again."""
fields = {
"test": "t" * 100
}
for x in range(30):
fields['f%d' % x] = (
'test', open('tests/test_multipart_encoder.py', 'rb')
)
m = MultipartEncoder(fields=fields)
total_size = m.len
blocksize = 8192
read_so_far = 0
while True:
data = m.read(blocksize)
if not data:
break
read_so_far += len(data)
assert read_so_far == total_size
def test_regression_2(self):
"""Ensure issue #31 doesn't ever happen again."""
fields = {
"test": "t" * 8100
}
m = MultipartEncoder(fields=fields)
total_size = m.len
blocksize = 8192
read_so_far = 0
while True:
data = m.read(blocksize)
if not data:
break
read_so_far += len(data)
assert read_so_far == total_size
def test_handles_empty_unicode_values(self):
"""Verify that the Encoder can handle empty unicode strings.
See https://github.com/sigmavirus24/requests-toolbelt/issues/46 for
more context.
"""
fields = [(b'test'.decode('utf-8'), b''.decode('utf-8'))]
m = MultipartEncoder(fields=fields)
assert len(m.read()) > 0
def test_accepts_custom_content_type(self):
"""Verify that the Encoder handles custom content-types.
See https://github.com/sigmavirus24/requests-toolbelt/issues/52
"""
fields = [
(b'test'.decode('utf-8'), (b'filename'.decode('utf-8'),
b'filecontent',
b'application/json'.decode('utf-8')))
]
m = MultipartEncoder(fields=fields)
output = m.read().decode('utf-8')
assert output.index('Content-Type: application/json\r\n') > 0
def test_accepts_custom_headers(self):
"""Verify that the Encoder handles custom headers.
See https://github.com/sigmavirus24/requests-toolbelt/issues/52
"""
fields = [
(b'test'.decode('utf-8'), (b'filename'.decode('utf-8'),
b'filecontent',
b'application/json'.decode('utf-8'),
{'X-My-Header': 'my-value'}))
]
m = MultipartEncoder(fields=fields)
output = m.read().decode('utf-8')
assert output.index('X-My-Header: my-value\r\n') > 0
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
import math
import unittest
from requests_toolbelt.multipart.encoder import (
IDENTITY, MultipartEncoder, MultipartEncoderMonitor
)
class TestMultipartEncoderMonitor(unittest.TestCase):
def setUp(self):
self.fields = {'a': 'b'}
self.boundary = 'thisisaboundary'
self.encoder = MultipartEncoder(self.fields, self.boundary)
self.monitor = MultipartEncoderMonitor(self.encoder)
def test_content_type(self):
assert self.monitor.content_type == self.encoder.content_type
def test_length(self):
assert self.encoder.len == self.monitor.len
def test_read(self):
new_encoder = MultipartEncoder(self.fields, self.boundary)
assert new_encoder.read() == self.monitor.read()
def test_callback_called_when_reading_everything(self):
callback = Callback(self.monitor)
self.monitor.callback = callback
self.monitor.read()
assert callback.called == 1
def test_callback(self):
callback = Callback(self.monitor)
self.monitor.callback = callback
chunk_size = int(math.ceil(self.encoder.len / 4.0))
while self.monitor.read(chunk_size):
pass
assert callback.called == 5
def test_bytes_read(self):
bytes_to_read = self.encoder.len
self.monitor.read()
assert self.monitor.bytes_read == bytes_to_read
def test_default_callable_is_the_identity(self):
assert self.monitor.callback == IDENTITY
assert IDENTITY(1) == 1
def test_from_fields(self):
monitor = MultipartEncoderMonitor.from_fields(
self.fields, self.boundary
)
assert isinstance(monitor, MultipartEncoderMonitor)
assert isinstance(monitor.encoder, MultipartEncoder)
assert monitor.encoder.boundary_value == self.boundary
class Callback(object):
def __init__(self, monitor):
self.called = 0
self.monitor = monitor
def __call__(self, monitor):
self.called += 1
assert monitor == self.monitor

View File

@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
"""Test proxy digest authentication."""
import unittest
import mock
import requests
from requests_toolbelt.auth import http_proxy_digest
class TestProxyDigestAuth(unittest.TestCase):
"""Tests for the ProxyDigestAuth class."""
def setUp(self):
"""Set up variables for each test."""
self.username = "username"
self.password = "password"
self.auth = http_proxy_digest.HTTPProxyDigestAuth(
self.username, self.password
)
self.prepared_request = requests.Request(
'GET',
'http://host.org/index.html'
).prepare()
def test_with_existing_nonce(self):
"""Test if it will generate Proxy-Auth header when nonce present.
Digest authentication's correctness will not be tested here.
"""
self.auth.last_nonce = "bH3FVAAAAAAg74rL3X8AAI3CyBAAAAAA"
self.auth.chal = {
'nonce': self.auth.last_nonce,
'realm': 'testrealm@host.org',
'qop': 'auth'
}
# prepared_request headers should be clear before calling auth
assert self.prepared_request.headers.get('Proxy-Authorization') is None
self.auth(self.prepared_request)
assert self.prepared_request.headers['Proxy-Authorization'] is not None
def test_no_challenge(self):
"""Test that a response containing no auth challenge is left alone."""
connection = MockConnection()
first_response = connection.make_response(self.prepared_request)
first_response.status_code = 404
assert self.auth.last_nonce == ''
final_response = self.auth.handle_407(first_response)
headers = final_response.request.headers
assert self.auth.last_nonce == ''
assert first_response is final_response
assert headers.get('Proxy-Authorization') is None
def test_digest_challenge(self):
"""Test a response with a digest auth challenge causes a new request.
This ensures that the auth class generates a new request with a
Proxy-Authorization header.
Digest authentication's correctness will not be tested here.
"""
connection = MockConnection()
first_response = connection.make_response(self.prepared_request)
first_response.status_code = 407
first_response.headers['Proxy-Authenticate'] = (
'Digest'
' realm="Fake Realm", nonce="oS6WVgAAAABw698CAAAAAHAk/HUAAAAA",'
' qop="auth", stale=false'
)
assert self.auth.last_nonce == ''
final_response = self.auth.handle_407(first_response)
headers = final_response.request.headers
assert self.auth.last_nonce != ''
assert first_response is not final_response
assert headers.get('Proxy-Authorization') is not None
def test_ntlm_challenge(self):
"""Test a response without a Digest auth challenge is left alone."""
connection = MockConnection()
first_response = connection.make_response(self.prepared_request)
first_response.status_code = 407
first_response.headers['Proxy-Authenticate'] = 'NTLM'
assert self.auth.last_nonce == ''
final_response = self.auth.handle_407(first_response)
headers = final_response.request.headers
assert self.auth.last_nonce == ''
assert first_response is final_response
assert headers.get('Proxy-Authorization') is None
class MockConnection(object):
"""Fake connection object."""
def send(self, request, **kwargs):
"""Mock out the send method."""
return self.make_response(request)
def make_response(self, request):
"""Make a response for us based on the request."""
response = requests.Response()
response.status_code = 200
response.request = request
response.raw = mock.MagicMock()
response.connection = self
return response
if __name__ == '__main__':
unittest.main()

28
tests/test_sessions.py Normal file
View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
import unittest
import pytest
from requests_toolbelt import sessions
from . import get_betamax
class TestBasedSession(unittest.TestCase):
def test_with_base(self):
session = sessions.BaseUrlSession('https://httpbin.org/')
recorder = get_betamax(session)
with recorder.use_cassette('simple_get_request'):
response = session.get('/get')
response.raise_for_status()
def test_without_base(self):
session = sessions.BaseUrlSession()
with pytest.raises(ValueError):
session.get('/')
def test_override_base(self):
session = sessions.BaseUrlSession('https://www.google.com')
recorder = get_betamax(session)
with recorder.use_cassette('simple_get_request'):
response = session.get('https://httpbin.org/get')
response.raise_for_status()
assert response.json()['headers']['Host'] == 'httpbin.org'

View File

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""Tests for the SocketOptionsAdapter and TCPKeepAliveAdapter."""
import contextlib
import socket
import mock
import requests
from requests_toolbelt._compat import poolmanager
from requests_toolbelt.adapters import socket_options
@contextlib.contextmanager
def remove_keepidle():
"""A context manager to remove TCP_KEEPIDLE from socket."""
TCP_KEEPIDLE = getattr(socket, 'TCP_KEEPIDLE', None)
if TCP_KEEPIDLE is not None:
del socket.TCP_KEEPIDLE
yield
if TCP_KEEPIDLE is not None:
socket.TCP_KEEPIDLE = TCP_KEEPIDLE
@contextlib.contextmanager
def set_keepidle(value):
"""A context manager to set TCP_KEEPALIVE on socket always."""
TCP_KEEPIDLE = getattr(socket, 'TCP_KEEPIDLE', None)
socket.TCP_KEEPIDLE = value
yield
if TCP_KEEPIDLE is not None:
socket.TCP_KEEPIDLE = TCP_KEEPIDLE
else:
del socket.TCP_KEEPIDLE
@mock.patch.object(requests, '__build__', 0x020500)
@mock.patch.object(poolmanager, 'PoolManager')
def test_options_passing_on_newer_requests(PoolManager):
"""Show that options are passed for a new enough version of requests."""
fake_opts = [('test', 'options', 'fake')]
adapter = socket_options.SocketOptionsAdapter(
socket_options=fake_opts,
pool_connections=10,
pool_maxsize=5,
pool_block=True,
)
PoolManager.assert_called_once_with(
num_pools=10, maxsize=5, block=True,
socket_options=fake_opts
)
assert adapter.socket_options == fake_opts
@mock.patch.object(requests, '__build__', 0x020300)
@mock.patch.object(poolmanager, 'PoolManager')
def test_options_not_passed_on_older_requests(PoolManager):
"""Show that options are not passed for older versions of requests."""
fake_opts = [('test', 'options', 'fake')]
socket_options.SocketOptionsAdapter(
socket_options=fake_opts,
pool_connections=10,
pool_maxsize=5,
pool_block=True,
)
assert PoolManager.called is False
@mock.patch.object(requests, '__build__', 0x020500)
@mock.patch.object(poolmanager, 'PoolManager')
def test_keep_alive_on_newer_requests_no_idle(PoolManager):
"""Show that options are generated correctly from kwargs."""
socket_opts = [
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10),
]
with remove_keepidle():
adapter = socket_options.TCPKeepAliveAdapter(
idle=30, interval=10, count=10,
pool_connections=10,
pool_maxsize=5,
pool_block=True,
)
PoolManager.assert_called_once_with(
num_pools=10, maxsize=5, block=True,
socket_options=socket_opts
)
assert adapter.socket_options == socket_opts
@mock.patch.object(requests, '__build__', 0x020500)
@mock.patch.object(poolmanager, 'PoolManager')
def test_keep_alive_on_newer_requests_with_idle(PoolManager):
"""Show that options are generated correctly from kwargs with KEEPIDLE."""
with set_keepidle(3000):
socket_opts = [
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10),
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30),
]
adapter = socket_options.TCPKeepAliveAdapter(
idle=30, interval=10, count=10,
pool_connections=10,
pool_maxsize=5,
pool_block=True,
)
PoolManager.assert_called_once_with(
num_pools=10, maxsize=5, block=True,
socket_options=socket_opts
)
assert adapter.socket_options == socket_opts

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
from requests.adapters import DEFAULT_POOLSIZE, DEFAULT_POOLBLOCK
from mock import patch
from requests_toolbelt.adapters.source import SourceAddressAdapter
import pytest
@patch('requests_toolbelt.adapters.source.poolmanager')
def test_source_address_adapter_string(poolmanager):
SourceAddressAdapter('10.10.10.10')
poolmanager.PoolManager.assert_called_once_with(
num_pools=DEFAULT_POOLSIZE,
maxsize=DEFAULT_POOLSIZE,
block=DEFAULT_POOLBLOCK,
source_address=('10.10.10.10', 0)
)
@patch('requests_toolbelt.adapters.source.poolmanager')
def test_source_address_adapter_tuple(poolmanager):
SourceAddressAdapter(('10.10.10.10', 80))
poolmanager.PoolManager.assert_called_once_with(
num_pools=DEFAULT_POOLSIZE,
maxsize=DEFAULT_POOLSIZE,
block=DEFAULT_POOLBLOCK,
source_address=('10.10.10.10', 80)
)
@patch('requests_toolbelt.adapters.source.poolmanager')
def test_source_address_adapter_type_error(poolmanager):
with pytest.raises(TypeError):
SourceAddressAdapter({'10.10.10.10': 80})
assert not poolmanager.PoolManager.called

31
tests/test_ssladapter.py Normal file
View File

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import mock
import pytest
import requests
import unittest
from requests_toolbelt import SSLAdapter
from . import get_betamax
class TestSSLAdapter(unittest.TestCase):
def setUp(self):
self.session = requests.Session()
self.session.mount('https://', SSLAdapter('SSLv3'))
self.recorder = get_betamax(self.session)
def test_klevas(self):
with self.recorder.use_cassette('klevas_vu_lt_ssl3'):
r = self.session.get('https://klevas.vu.lt/')
assert r.status_code == 200
@pytest.mark.skipif(requests.__build__ < 0x020400,
reason="Requires Requests v2.4.0 or later")
@mock.patch('requests.packages.urllib3.poolmanager.ProxyManager')
def test_proxies(self, ProxyManager):
a = SSLAdapter('SSLv3')
a.proxy_manager_for('http://127.0.0.1:8888')
assert ProxyManager.call_count == 1
kwargs = ProxyManager.call_args_list[0][1]
assert kwargs['ssl_version'] == 'SSLv3'

View File

@ -0,0 +1,68 @@
import io
from requests_toolbelt.streaming_iterator import StreamingIterator
import pytest
@pytest.fixture(params=[True, False])
def get_iterable(request):
'''
When this fixture is used, the test is run twice -- once with the iterable
being a file-like object, once being an iterator.
'''
is_file = request.param
def inner(chunks):
if is_file:
return io.BytesIO(b''.join(chunks))
return iter(chunks)
return inner
class TestStreamingIterator(object):
@pytest.fixture(autouse=True)
def setup(self, get_iterable):
self.chunks = [b'here', b'are', b'some', b'chunks']
self.size = 17
self.uploader = StreamingIterator(self.size, get_iterable(self.chunks))
def test_read_returns_all_chunks_in_one(self):
assert self.uploader.read() == b''.join(self.chunks)
def test_read_returns_empty_string_after_exhausting_the_iterator(self):
for i in range(0, 4):
self.uploader.read(8192)
assert self.uploader.read() == b''
assert self.uploader.read(8192) == b''
class TestStreamingIteratorWithLargeChunks(object):
@pytest.fixture(autouse=True)
def setup(self, get_iterable):
self.letters = [b'a', b'b', b'c', b'd', b'e']
self.chunks = (letter * 2000 for letter in self.letters)
self.size = 5 * 2000
self.uploader = StreamingIterator(self.size, get_iterable(self.chunks))
def test_returns_the_amount_requested(self):
chunk_size = 1000
bytes_read = 0
while True:
b = self.uploader.read(chunk_size)
if not b:
break
assert len(b) == chunk_size
bytes_read += len(b)
assert bytes_read == self.size
def test_returns_all_of_the_bytes(self):
chunk_size = 8192
bytes_read = 0
while True:
b = self.uploader.read(chunk_size)
if not b:
break
bytes_read += len(b)
assert bytes_read == self.size

105
tests/test_user_agent.py Normal file
View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
import unittest
import sys
from mock import patch
import pytest
from requests_toolbelt.utils import user_agent as ua
class Object(object):
"""
A simple mock object that can have attributes added to it.
"""
pass
class TestUserAgentBuilder(unittest.TestCase):
def test_only_user_agent_name(self):
assert 'fake/1.0.0' == ua.UserAgentBuilder('fake', '1.0.0').build()
def test_includes_extras(self):
expected = 'fake/1.0.0 another-fake/2.0.1 yet-another-fake/17.1.0'
actual = ua.UserAgentBuilder('fake', '1.0.0').include_extras([
('another-fake', '2.0.1'),
('yet-another-fake', '17.1.0'),
]).build()
assert expected == actual
@patch('platform.python_implementation', return_value='CPython')
@patch('platform.python_version', return_value='2.7.13')
def test_include_implementation(self, *_):
expected = 'fake/1.0.0 CPython/2.7.13'
actual = ua.UserAgentBuilder('fake', '1.0.0').include_implementation(
).build()
assert expected == actual
@patch('platform.system', return_value='Linux')
@patch('platform.release', return_value='4.9.5')
def test_include_system(self, *_):
expected = 'fake/1.0.0 Linux/4.9.5'
actual = ua.UserAgentBuilder('fake', '1.0.0').include_system(
).build()
assert expected == actual
class TestUserAgent(unittest.TestCase):
def test_user_agent_provides_package_name(self):
assert "my-package" in ua.user_agent("my-package", "0.0.1")
def test_user_agent_provides_package_version(self):
assert "0.0.1" in ua.user_agent("my-package", "0.0.1")
def test_user_agent_builds_extras_appropriately(self):
assert "extra/1.0.0" in ua.user_agent(
"my-package", "0.0.1", extras=[("extra", "1.0.0")]
)
def test_user_agent_checks_extras_for_tuples_of_incorrect_length(self):
with pytest.raises(ValueError):
ua.user_agent("my-package", "0.0.1", extras=[
("extra", "1.0.0", "oops")
])
with pytest.raises(ValueError):
ua.user_agent("my-package", "0.0.1", extras=[
("extra",)
])
class TestImplementationString(unittest.TestCase):
@patch('platform.python_implementation')
@patch('platform.python_version')
def test_cpython_implementation(self, mock_version, mock_implementation):
mock_implementation.return_value = 'CPython'
mock_version.return_value = '2.7.5'
assert 'CPython/2.7.5' == ua._implementation_string()
@patch('platform.python_implementation')
def test_pypy_implementation_final(self, mock_implementation):
mock_implementation.return_value = 'PyPy'
sys.pypy_version_info = Object()
sys.pypy_version_info.major = 2
sys.pypy_version_info.minor = 0
sys.pypy_version_info.micro = 1
sys.pypy_version_info.releaselevel = 'final'
assert 'PyPy/2.0.1' == ua._implementation_string()
@patch('platform.python_implementation')
def test_pypy_implementation_non_final(self, mock_implementation):
mock_implementation.return_value = 'PyPy'
sys.pypy_version_info = Object()
sys.pypy_version_info.major = 2
sys.pypy_version_info.minor = 0
sys.pypy_version_info.micro = 1
sys.pypy_version_info.releaselevel = 'beta2'
assert 'PyPy/2.0.1beta2' == ua._implementation_string()
@patch('platform.python_implementation')
def test_unknown_implementation(self, mock_implementation):
mock_implementation.return_value = "Lukasa'sSuperPython"
assert "Lukasa'sSuperPython/Unknown" == ua._implementation_string()

View File

View File

@ -0,0 +1,59 @@
"""Module containing tests for requests_toolbelt.threaded API."""
import mock
import pytest
from requests_toolbelt._compat import queue
from requests_toolbelt import threaded
def test_creates_a_pool_for_the_user():
"""Assert a Pool object is used correctly and as we expect.
This just ensures that we're not jumping through any extra hoops with our
internal usage of a Pool object.
"""
mocked_pool = mock.Mock(spec=['join_all', 'responses', 'exceptions'])
with mock.patch('requests_toolbelt.threaded.pool.Pool') as Pool:
Pool.return_value = mocked_pool
threaded.map([{}, {}])
assert Pool.called is True
_, kwargs = Pool.call_args
assert 'job_queue' in kwargs
assert isinstance(kwargs['job_queue'], queue.Queue)
mocked_pool.join_all.assert_called_once_with()
mocked_pool.responses.assert_called_once_with()
mocked_pool.exceptions.assert_called_once_with()
def test_raises_a_value_error_for_non_dictionaries():
"""Exercise our lazy valdation."""
with pytest.raises(ValueError):
threaded.map([[], []])
def test_raises_a_value_error_for_falsey_requests():
"""Assert that the requests param is truthy."""
with pytest.raises(ValueError):
threaded.map([])
with pytest.raises(ValueError):
threaded.map(None)
def test_passes_on_kwargs():
"""Verify that we pass on kwargs to the Pool constructor."""
mocked_pool = mock.Mock(spec=['join_all', 'responses', 'exceptions'])
with mock.patch('requests_toolbelt.threaded.pool.Pool') as Pool:
Pool.return_value = mocked_pool
threaded.map([{}, {}], num_processes=1000,
initializer=test_passes_on_kwargs)
_, kwargs = Pool.call_args
assert 'job_queue' in kwargs
assert 'num_processes' in kwargs
assert 'initializer' in kwargs
assert kwargs['num_processes'] == 1000
assert kwargs['initializer'] == test_passes_on_kwargs

226
tests/threaded/test_pool.py Normal file
View File

@ -0,0 +1,226 @@
"""Module containing the tests for requests_toolbelt.threaded.pool."""
try:
import queue # Python 3
except ImportError:
import Queue as queue
import unittest
import mock
import pytest
from requests_toolbelt.threaded import pool
from requests_toolbelt.threaded import thread
class TestPool(unittest.TestCase):
"""Collection of tests for requests_toolbelt.threaded.pool.Pool."""
def test_requires_positive_number_of_processes(self):
"""Show that the number of processes has to be > 0."""
with pytest.raises(ValueError):
pool.Pool(None, num_processes=0)
with pytest.raises(ValueError):
pool.Pool(None, num_processes=-1)
def test_number_of_processes_can_be_arbitrary(self):
"""Show that the number of processes can be set."""
p = pool.Pool(None, num_processes=100)
assert p._processes == 100
assert len(p._pool) == 100
p = pool.Pool(None, num_processes=1)
assert p._processes == 1
assert len(p._pool) == 1
def test_initializer_is_called(self):
"""Ensure that the initializer function is called."""
initializer = mock.MagicMock()
pool.Pool(None, num_processes=1, initializer=initializer)
assert initializer.called is True
initializer.assert_called_once_with(mock.ANY)
def test_auth_generator_is_called(self):
"""Ensure that the auth_generator function is called."""
auth_generator = mock.MagicMock()
pool.Pool(None, num_processes=1, auth_generator=auth_generator)
assert auth_generator.called is True
auth_generator.assert_called_once_with(mock.ANY)
def test_session_is_called(self):
"""Ensure that the session function is called."""
session = mock.MagicMock()
pool.Pool(None, num_processes=1, session=session)
assert session.called is True
session.assert_called_once_with()
def test_from_exceptions_populates_a_queue(self):
"""Ensure a Queue is properly populated from exceptions."""
urls = ["https://httpbin.org/get?n={0}".format(n) for n in range(5)]
Exc = pool.ThreadException
excs = (Exc({'method': 'GET', 'url': url}, None) for url in urls)
job_queue = mock.MagicMock()
with mock.patch.object(queue, 'Queue', return_value=job_queue):
with mock.patch.object(thread, 'SessionThread'):
pool.Pool.from_exceptions(excs)
assert job_queue.put.call_count == 5
assert job_queue.put.mock_calls == [
mock.call({'method': 'GET', 'url': url})
for url in urls
]
def test_from_urls_constructs_get_requests(self):
"""Ensure a Queue is properly populated from an iterable of urls."""
urls = ["https://httpbin.org/get?n={0}".format(n) for n in range(5)]
job_queue = mock.MagicMock()
with mock.patch.object(queue, 'Queue', return_value=job_queue):
with mock.patch.object(thread, 'SessionThread'):
pool.Pool.from_urls(urls)
assert job_queue.put.call_count == 5
assert job_queue.put.mock_calls == [
mock.call({'method': 'GET', 'url': url})
for url in urls
]
def test_from_urls_constructs_get_requests_with_kwargs(self):
"""Ensure a Queue is properly populated from an iterable of urls."""
def merge(*args):
final = {}
for d in args:
final.update(d)
return final
urls = ["https://httpbin.org/get?n={0}".format(n) for n in range(5)]
kwargs = {'stream': True, 'headers': {'Accept': 'application/json'}}
job_queue = mock.MagicMock()
with mock.patch.object(queue, 'Queue', return_value=job_queue):
with mock.patch.object(thread, 'SessionThread'):
pool.Pool.from_urls(urls, kwargs)
assert job_queue.put.call_count == 5
assert job_queue.put.mock_calls == [
mock.call(merge({'method': 'GET', 'url': url}, kwargs))
for url in urls
]
def test_join_all(self):
"""Ensure that all threads are joined properly."""
session_threads = []
def _side_effect(*args, **kwargs):
thread = mock.MagicMock()
session_threads.append(thread)
return thread
with mock.patch.object(thread, 'SessionThread',
side_effect=_side_effect):
pool.Pool(None).join_all()
for st in session_threads:
st.join.assert_called_once_with()
def test_get_response_returns_thread_response(self):
"""Ensure that a ThreadResponse is made when there's data."""
queues = []
def _side_effect():
q = mock.MagicMock()
q.get_nowait.return_value = ({}, None)
queues.append(q)
return q
with mock.patch.object(queue, 'Queue', side_effect=_side_effect):
with mock.patch.object(thread, 'SessionThread'):
p = pool.Pool(None)
assert len(queues) == 2
assert isinstance(p.get_response(), pool.ThreadResponse)
assert len([q for q in queues if q.get_nowait.called]) == 1
def test_get_exception_returns_thread_exception(self):
"""Ensure that a ThreadException is made when there's data."""
queues = []
def _side_effect():
q = mock.MagicMock()
q.get_nowait.return_value = ({}, None)
queues.append(q)
return q
with mock.patch.object(queue, 'Queue', side_effect=_side_effect):
with mock.patch.object(thread, 'SessionThread'):
p = pool.Pool(None)
assert len(queues) == 2
assert isinstance(p.get_exception(), pool.ThreadException)
assert len([q for q in queues if q.get_nowait.called]) == 1
def test_get_response_returns_none_when_queue_is_empty(self):
"""Ensure that None is returned when the response Queue is empty."""
queues = []
def _side_effect():
q = mock.MagicMock()
q.get_nowait.side_effect = queue.Empty()
queues.append(q)
return q
with mock.patch.object(queue, 'Queue', side_effect=_side_effect):
with mock.patch.object(thread, 'SessionThread'):
p = pool.Pool(None)
assert len(queues) == 2
assert p.get_response() is None
assert len([q for q in queues if q.get_nowait.called]) == 1
def test_get_exception_returns_none_when_queue_is_empty(self):
"""Ensure that None is returned when the exception Queue is empty."""
queues = []
def _side_effect():
q = mock.MagicMock()
q.get_nowait.side_effect = queue.Empty()
queues.append(q)
return q
with mock.patch.object(queue, 'Queue', side_effect=_side_effect):
with mock.patch.object(thread, 'SessionThread'):
p = pool.Pool(None)
assert len(queues) == 2
assert p.get_exception() is None
assert len([q for q in queues if q.get_nowait.called]) == 1
def test_lists_are_correctly_returned(self):
"""Ensure that exceptions and responses return correct lists."""
def _make_queue():
q = queue.Queue()
q.put(({}, None))
return q
with mock.patch.object(thread, 'SessionThread'):
p = pool.Pool(None)
# Set up real queues.
p._response_queue = _make_queue()
p._exc_queue = _make_queue()
excs = list(p.exceptions())
assert len(excs) == 1
for exc in excs:
assert isinstance(exc, pool.ThreadException)
resps = list(p.responses())
assert len(resps) == 1
for resp in resps:
assert isinstance(resp, pool.ThreadResponse)

View File

@ -0,0 +1,131 @@
"""Module containing the tests for requests_toolbelt.threaded.thread."""
try:
import queue # Python 3
except ImportError:
import Queue as queue
import threading
import unittest
import uuid
import mock
import requests.exceptions
from requests_toolbelt.threaded import thread
def _make_mocks():
return (mock.MagicMock() for _ in range(4))
def _initialize_a_session_thread(session=None, job_queue=None,
response_queue=None, exception_queue=None):
with mock.patch.object(threading, 'Thread') as Thread:
thread_instance = mock.MagicMock()
Thread.return_value = thread_instance
st = thread.SessionThread(
initialized_session=session,
job_queue=job_queue,
response_queue=response_queue,
exception_queue=exception_queue,
)
return (st, thread_instance, Thread)
class TestSessionThread(unittest.TestCase):
"""Tests for requests_toolbelt.threaded.thread.SessionThread."""
def test_thread_initialization(self):
"""Test the way a SessionThread is initialized.
We want to ensure that we creat a thread with a name generated by the
uuid module, and that we pass the right method to use as a target.
"""
with mock.patch.object(uuid, 'uuid4', return_value='test'):
(st, thread_instance, Thread) = _initialize_a_session_thread()
Thread.assert_called_once_with(target=st._make_request, name='test')
assert thread_instance.daemon is True
assert thread_instance._state is 0
thread_instance.start.assert_called_once_with()
def test_is_alive_proxies_to_worker(self):
"""Test that we proxy the is_alive method to the Thread."""
with mock.patch.object(threading, 'Thread') as Thread:
thread_instance = mock.MagicMock()
Thread.return_value = thread_instance
st = thread.SessionThread(None, None, None, None)
st.is_alive()
thread_instance.is_alive.assert_called_once_with()
def test_join_proxies_to_worker(self):
"""Test that we proxy the join method to the Thread."""
st, thread_instance, _ = _initialize_a_session_thread()
st.join()
thread_instance.join.assert_called_once_with()
def test_handle_valid_request(self):
"""Test that a response is added to the right queue."""
session, job_queue, response_queue, exception_queue = _make_mocks()
response = mock.MagicMock()
session.request.return_value = response
st, _, _ = _initialize_a_session_thread(
session, job_queue, response_queue, exception_queue)
st._handle_request({'method': 'GET', 'url': 'http://example.com'})
session.request.assert_called_once_with(
method='GET',
url='http://example.com'
)
response_queue.put.assert_called_once_with(
({'method': 'GET', 'url': 'http://example.com'}, response)
)
assert exception_queue.put.called is False
assert job_queue.get.called is False
assert job_queue.get_nowait.called is False
assert job_queue.get_nowait.called is False
assert job_queue.task_done.called is True
def test_handle_invalid_request(self):
"""Test that exceptions from requests are added to the right queue."""
session, job_queue, response_queue, exception_queue = _make_mocks()
exception = requests.exceptions.InvalidURL()
def _side_effect(*args, **kwargs):
raise exception
# Make the request raise an exception
session.request.side_effect = _side_effect
st, _, _ = _initialize_a_session_thread(
session, job_queue, response_queue, exception_queue)
st._handle_request({'method': 'GET', 'url': 'http://example.com'})
session.request.assert_called_once_with(
method='GET',
url='http://example.com'
)
exception_queue.put.assert_called_once_with(
({'method': 'GET', 'url': 'http://example.com'}, exception)
)
assert response_queue.put.called is False
assert job_queue.get.called is False
assert job_queue.get_nowait.called is False
assert job_queue.get_nowait.called is False
assert job_queue.task_done.called is True
def test_make_request(self):
"""Test that _make_request exits when the queue is Empty."""
job_queue = next(_make_mocks())
job_queue.get_nowait.side_effect = queue.Empty()
st, _, _ = _initialize_a_session_thread(job_queue=job_queue)
st._make_request()
job_queue.get_nowait.assert_called_once_with()

55
tox.ini Normal file
View File

@ -0,0 +1,55 @@
[tox]
envlist = py27,py33,py34,py35,pypy,{py27,py34}-flake8,docstrings
[testenv]
pip_pre = False
deps =
requests{env:REQUESTS_VERSION:>=2.0.1,<3.0.0}
pytest
mock
betamax>0.5.0
commands = py.test {posargs}
[testenv:py27-flake8]
basepython = python2.7
deps =
flake8
commands = flake8 {posargs} requests_toolbelt
[testenv:py34-flake8]
basepython = python3.4
deps =
flake8
commands = flake8 {posargs} requests_toolbelt
[testenv:docstrings]
deps =
flake8
flake8-docstrings
commands = flake8 {posargs} requests_toolbelt
[testenv:docs]
deps =
sphinx>=1.3.0
sphinx_rtd_theme
.
commands =
sphinx-build -E -c docs -b html docs/ docs/_build/html
[testenv:readme]
deps =
readme_renderer
commands =
python setup.py check -m -r -s
[testenv:release]
deps =
twine >= 1.4.0
wheel
commands =
python setup.py sdist bdist_wheel
twine upload --skip-existing dist/*
[pytest]
addopts = -q
norecursedirs = *.egg .git .* _*