829 lines
27 KiB
Python
829 lines
27 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import decimal
|
|
import calendar
|
|
import re
|
|
import os
|
|
import time
|
|
import base64
|
|
import json
|
|
import subprocess
|
|
import unicodedata
|
|
import hashlib
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
Image = None
|
|
|
|
from django.conf import settings
|
|
from django.utils import datetime_safe
|
|
from django.utils import six
|
|
from django.utils.encoding import force_text
|
|
from django.utils.html import strip_tags
|
|
from django.template import engines, TemplateSyntaxError, VariableDoesNotExist
|
|
from django.utils.six.moves.html_parser import HTMLParser
|
|
from django.utils.six.moves.urllib.parse import quote, urlencode
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from quixote import get_publisher, get_response, get_request, redirect
|
|
from quixote.html import htmltext
|
|
|
|
from . import _, force_str
|
|
from . import get_cfg, get_logger, ezt
|
|
from .errors import ConnectionError, RequestError
|
|
from .template import Template
|
|
|
|
from django.utils.six import BytesIO, StringIO
|
|
|
|
try:
|
|
subprocess.check_call(['which', 'pdftoppm'], stdout=subprocess.DEVNULL)
|
|
HAS_PDFTOPPM = True
|
|
except subprocess.CalledProcessError:
|
|
HAS_PDFTOPPM = False
|
|
|
|
|
|
EXIF_ORIENTATION = 0x0112
|
|
|
|
|
|
class ThumbnailError(Exception):
|
|
pass
|
|
|
|
|
|
def get_abs_path(s):
|
|
if not s:
|
|
return s
|
|
if s[0] == '/':
|
|
return s
|
|
return os.path.join(get_publisher().app_dir, s)
|
|
|
|
|
|
def get_lasso_server():
|
|
if not get_cfg('sp'):
|
|
return None
|
|
import lasso
|
|
server = lasso.Server(
|
|
get_abs_path(get_cfg('sp')['saml2_metadata']),
|
|
get_abs_path(get_cfg('sp')['privatekey']),
|
|
None, None)
|
|
server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
|
|
|
|
# Set encryption private key
|
|
encryption_privatekey = get_abs_path(get_cfg('sp').get('encryption_privatekey'))
|
|
if encryption_privatekey and os.path.exists(encryption_privatekey):
|
|
try:
|
|
server.setEncryptionPrivateKey(encryption_privatekey)
|
|
except lasso.Error as error:
|
|
get_logger().warn('Failed to set encryption private key')
|
|
|
|
for klp, idp in sorted(get_cfg('idp', {}).items(), key=lambda k: k[0]):
|
|
try:
|
|
server.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
get_abs_path(idp['metadata']),
|
|
get_abs_path(idp.get('publickey')),
|
|
get_abs_path(idp.get('cacertchain')))
|
|
except lasso.Error as error:
|
|
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_PROTOCOL_MISMATCH:
|
|
continue
|
|
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_FAILED:
|
|
continue
|
|
raise
|
|
|
|
encryption_mode = lasso.ENCRYPTION_MODE_NONE
|
|
if idp.get('encrypt_nameid', False):
|
|
encryption_mode |= lasso.ENCRYPTION_MODE_NAMEID
|
|
provider_t = get_provider(klp)
|
|
provider = server.getProvider(provider_t.providerId)
|
|
if provider is not None:
|
|
provider.setEncryptionMode(encryption_mode)
|
|
|
|
return server
|
|
|
|
|
|
def get_provider_label(provider):
|
|
if not provider:
|
|
return None
|
|
if not hasattr(provider, str('getOrganization')):
|
|
return provider.providerId
|
|
|
|
organization = provider.getOrganization()
|
|
if not organization:
|
|
return provider.providerId
|
|
|
|
name = re.findall("<OrganizationDisplayName.*>(.*?)</OrganizationDisplayName>", organization)
|
|
if not name:
|
|
name = re.findall("<OrganizationName.*>(.*?)</OrganizationName>", organization)
|
|
if not name:
|
|
return provider.providerId
|
|
return htmltext(name[0].decode('utf8').encode(get_publisher().site_charset))
|
|
|
|
|
|
def get_provider(provider_key):
|
|
lp = get_cfg('idp', {}).get(provider_key)
|
|
if not lp:
|
|
raise KeyError()
|
|
|
|
import lasso
|
|
|
|
publickey_fn = None
|
|
if lp.get('publickey'):
|
|
publickey_fn = get_abs_path(lp['publickey'])
|
|
# cacertchain (not really necessary to get provider label)
|
|
|
|
try:
|
|
provider = lasso.Provider(lasso.PROVIDER_ROLE_IDP,
|
|
get_abs_path(lp['metadata']), publickey_fn, None)
|
|
except lasso.Error:
|
|
raise KeyError()
|
|
|
|
return provider
|
|
|
|
|
|
def get_provider_key(provider_id):
|
|
return provider_id.replace('://', '-').replace('/', '-').replace('?', '-').replace(':', '-')
|
|
|
|
|
|
def simplify(s, space='-'):
|
|
if s is None:
|
|
return ''
|
|
if not isinstance(s, six.text_type):
|
|
if get_publisher() and get_publisher().site_charset:
|
|
s = force_text('%s' % s, get_publisher().site_charset, errors='ignore')
|
|
else:
|
|
s = force_text('%s' % s, 'iso-8859-1', errors='ignore')
|
|
s = force_text(unicodedata.normalize('NFKD', s).encode('ascii', 'ignore'))
|
|
s = re.sub(r'[^\w\s\'%s]' % space, '', s).strip().lower()
|
|
s = re.sub(r'[\s\'%s]+' % space, space, s)
|
|
return s
|
|
|
|
|
|
def get_datetime_language():
|
|
lang = get_cfg('language', {}).get('language', None)
|
|
if lang is None:
|
|
if os.environ.get('LC_TIME'):
|
|
lang = os.environ.get('LC_TIME')[:2]
|
|
elif os.environ.get('LC_ALL'):
|
|
lang = os.environ.get('LC_ALL')[:2]
|
|
return lang
|
|
|
|
|
|
def strftime(fmt, dt):
|
|
if not dt:
|
|
return ''
|
|
if not isinstance(dt, datetime.datetime):
|
|
if isinstance(dt, datetime.date):
|
|
dt = datetime.datetime(dt.year, dt.month, dt.day)
|
|
else:
|
|
# consider it a 9 elements tuple
|
|
dt = datetime.datetime(*dt[:6])
|
|
else:
|
|
# un-lazyfication: get real datetime objet,
|
|
# in case of a lazy datetime objet
|
|
dt = dt.replace()
|
|
return datetime_safe.strftime(dt, fmt)
|
|
|
|
|
|
def localstrftime(t):
|
|
if not t:
|
|
return ''
|
|
return strftime(datetime_format(), t)
|
|
|
|
DATE_FORMATS = {
|
|
'C': ['%Y-%m-%d', '%y-%m-%d'],
|
|
'fr': ['%d/%m/%Y', '%d/%m/%y'],
|
|
}
|
|
|
|
DATETIME_FORMATS = {
|
|
'C': ['%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%SZ',
|
|
'%y-%m-%d %H:%M', '%y-%m-%d %H:%M:%S'],
|
|
'fr': ['%d/%m/%Y %H:%M', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y %Hh%M',
|
|
'%d/%m/%y %H:%M', '%d/%m/%y %H:%M:%S', '%d/%m/%y %Hh%M'],
|
|
}
|
|
|
|
|
|
def datetime_format():
|
|
lang = get_datetime_language()
|
|
if lang not in DATETIME_FORMATS:
|
|
lang = 'C'
|
|
return DATETIME_FORMATS[lang][0]
|
|
|
|
|
|
def date_format():
|
|
lang = get_datetime_language()
|
|
if lang not in DATE_FORMATS:
|
|
lang = 'C'
|
|
return DATE_FORMATS[lang][0]
|
|
|
|
|
|
def get_as_datetime(s):
|
|
formats = [datetime_format(), date_format()] # prefer current locale
|
|
for value in DATETIME_FORMATS.values():
|
|
formats.extend(value)
|
|
for value in DATE_FORMATS.values():
|
|
formats.extend(value)
|
|
exception = ValueError()
|
|
for format_string in formats:
|
|
try:
|
|
return datetime.datetime.strptime(s, format_string)
|
|
except ValueError as e:
|
|
exception = e
|
|
raise exception
|
|
|
|
|
|
def site_encode(s):
|
|
if s is None:
|
|
return None
|
|
if isinstance(s, str):
|
|
return s
|
|
if not isinstance(s, six.string_types):
|
|
s = force_text(s)
|
|
return s.encode(get_publisher().site_charset)
|
|
|
|
|
|
def ellipsize(s, length=30):
|
|
s = force_text(s, get_publisher().site_charset, errors='replace')
|
|
if s and len(s) >= length:
|
|
s = s[:length-5] + ' (...)'
|
|
return force_str(s)
|
|
|
|
|
|
def get_month_name(month):
|
|
month_names = [_('January'), _('February'), _('March'), _('April'),
|
|
_('May'), _('June'), _('July'), _('August'),
|
|
_('September'), _('October'), _('November'), _('December') ]
|
|
return month_names[month-1]
|
|
|
|
|
|
def format_time(datetime, formatstring, gmtime = False):
|
|
if not datetime:
|
|
return '?'
|
|
if type(datetime) in (int, float):
|
|
if gmtime:
|
|
datetime = time.gmtime(datetime)
|
|
else:
|
|
datetime = time.localtime(datetime)
|
|
if len(datetime) == 2:
|
|
year, month = datetime
|
|
weekday = None
|
|
elif len(datetime) == 3:
|
|
year, month, day = datetime
|
|
weekday = None
|
|
else:
|
|
year, month, day, hour, minute, second, weekday = datetime[:7]
|
|
|
|
weekday_names = [_('Monday'), _('Tuesday'), _('Wednesday'),
|
|
_('Thursday'), _('Friday'), _('Saturday'), _('Sunday')]
|
|
|
|
if weekday is not None:
|
|
weekday_name = weekday_names[weekday]
|
|
lower_weekday_name = weekday_name.lower()
|
|
abbr_weekday_name = weekday_name[:3]
|
|
|
|
month_name = get_month_name(month)
|
|
lower_month_name = month_name.lower()
|
|
abbr_month_name = month_name[:3]
|
|
|
|
return formatstring % locals()
|
|
|
|
|
|
def _http_request(url, method='GET', body=None, headers={}, cert_file=None, timeout=None,
|
|
raise_on_http_errors=False):
|
|
get_publisher().reload_cfg()
|
|
|
|
splitted_url = urlparse.urlsplit(url)
|
|
if splitted_url.scheme not in ('http', 'https'):
|
|
raise ConnectionError('invalid scheme in URL %s' % url)
|
|
|
|
hostname = splitted_url.netloc
|
|
timeout = timeout or settings.REQUESTS_TIMEOUT
|
|
|
|
# re-use HTTP adapter to get connection pooling and keep-alive.
|
|
adapter = getattr(get_publisher(), '_http_adapter', None)
|
|
if adapter is None:
|
|
adapter = get_publisher()._http_adapter = HTTPAdapter()
|
|
|
|
session = requests.Session()
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
|
|
try:
|
|
response = session.request(method, url, headers=headers, data=body,
|
|
timeout=timeout, cert=cert_file, proxies=settings.REQUESTS_PROXIES)
|
|
except requests.Timeout:
|
|
raise ConnectionError('connection timed out while fetching the page')
|
|
except requests.RequestException as err:
|
|
raise ConnectionError('error in HTTP request to %s (%s)' % (hostname, err))
|
|
else:
|
|
data = response.content
|
|
status = response.status_code
|
|
auth_header = response.headers.get('WWW-Authenticate')
|
|
|
|
if raise_on_http_errors and not (200 <= status < 300):
|
|
raise ConnectionError('error in HTTP request to (status: %s)' % status)
|
|
|
|
return response, status, data, auth_header
|
|
|
|
|
|
def urlopen(url, data=None):
|
|
response, status, data, auth_header = _http_request(
|
|
url, 'GET' if data is None else 'POST',
|
|
body=data,
|
|
raise_on_http_errors=True)
|
|
return BytesIO(data)
|
|
|
|
|
|
def http_get_page(url, **kwargs):
|
|
return _http_request(url, **kwargs)
|
|
|
|
|
|
def http_patch_request(url, body=None, **kwargs):
|
|
return _http_request(url, 'PATCH', body, **kwargs)
|
|
|
|
|
|
def http_post_request(url, body=None, **kwargs):
|
|
return _http_request(url, 'POST', body, **kwargs)
|
|
|
|
|
|
def http_delete_request(url, **kwargs):
|
|
return _http_request(url, 'DELETE', **kwargs)
|
|
|
|
|
|
def get_variadic_url(url, variables, encode_query=True):
|
|
if not Template.is_template_string(url):
|
|
return url
|
|
|
|
# django template
|
|
if '{{' in url or '{%' in url:
|
|
try:
|
|
url = Template(url).render(variables)
|
|
p = urlparse.urlsplit(url)
|
|
scheme, netloc, path, query, fragment = (
|
|
p.scheme, p.netloc, p.path, p.query, p.fragment)
|
|
if path.startswith('//'):
|
|
# don't let double slash happen at the root of the URL, this
|
|
# happens when a template such as {{url}}/path is used (with
|
|
# {{url}} already ending with a slash).
|
|
path = path[1:]
|
|
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
|
|
except (TemplateSyntaxError, VariableDoesNotExist):
|
|
return url
|
|
|
|
# ezt template, try to be safe
|
|
def ezt_substitute(template, variables):
|
|
tmpl = ezt.Template()
|
|
tmpl.parse(template)
|
|
fd = StringIO()
|
|
tmpl.generate(fd, variables)
|
|
return fd.getvalue()
|
|
|
|
def partial_quote(string):
|
|
# unquote brackets, as there may be further processing that needs them
|
|
# intact.
|
|
return quote(string).replace('%5B', '[').replace('%5D', ']')
|
|
|
|
p = urlparse.urlsplit(url)
|
|
scheme, netloc, path, query, fragment = \
|
|
p.scheme, p.netloc, p.path, p.query, p.fragment
|
|
if netloc and '[' in netloc:
|
|
netloc = ezt_substitute(netloc, variables)
|
|
if path and '[' in path:
|
|
if scheme == '' and netloc == '':
|
|
# this happened because the variable was set in the scheme
|
|
# (ex: http[https]://www.example.net) or because the value starts
|
|
# with a variable name (ex: [url]); in that situation we do not
|
|
# quote at all.
|
|
if path.count('//') == 1:
|
|
# there were no / in the original path (the two / comes from
|
|
# the scheme/netloc separation, this means there is no path)
|
|
before_path = ezt_substitute(path, variables)
|
|
p2 = urlparse.urlsplit(before_path)
|
|
scheme, netloc, path = p2.scheme, p2.netloc, p2.path
|
|
else:
|
|
# there is a path, we need to get back to the original URL and
|
|
# split it on the last /, to isolate the path part.
|
|
lastslash = '/' if path.endswith('/') else ''
|
|
if '/' in path:
|
|
before_path, path = path.rsplit('/', 1)
|
|
else:
|
|
before_path, path = path, ''
|
|
before_path = ezt_substitute(before_path, variables)
|
|
p2 = urlparse.urlsplit(before_path)
|
|
scheme, netloc = p2.scheme, p2.netloc
|
|
if p2.path:
|
|
if not path:
|
|
path, query2 = p2.path + lastslash, p2.query
|
|
else:
|
|
path, query2 = p2.path + '/' + path, p2.query
|
|
if query and query2:
|
|
query += '&' + query2
|
|
else:
|
|
query = query or query2
|
|
if path:
|
|
path = partial_quote(ezt_substitute(path, variables))
|
|
if not path:
|
|
path = '/'
|
|
if path.startswith('//'):
|
|
path = path[1:]
|
|
if fragment and '[' in fragment:
|
|
fragment = partial_quote(ezt_substitute(fragment, variables))
|
|
if query and '[' in query:
|
|
p_qs = urlparse.parse_qsl(query)
|
|
if len(p_qs) == 0:
|
|
# this happened because the query string has no key/values,
|
|
# probably because it's a single substitution variable (ex:
|
|
# http://www.example.net/foobar?[query])
|
|
query = ezt_substitute(query, variables)
|
|
else:
|
|
query = []
|
|
for k, v in p_qs:
|
|
if '[' in k:
|
|
k = ezt_substitute(k, variables)
|
|
if '[' in v:
|
|
v = ezt_substitute(v, variables)
|
|
query.append((k, v))
|
|
if encode_query:
|
|
query = urlencode(query)
|
|
else:
|
|
query = '&'.join('%s=%s' % (k,v) for (k,v) in query)
|
|
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
|
|
|
|
|
|
def get_foreground_colour(background_colour):
|
|
"""Calculates the luminance of the given colour (six hexadecimal digits)
|
|
and returns an appropriate foreground colour."""
|
|
# luminance coefficients taken from section C-9 from
|
|
# http://www.faqs.org/faqs/graphics/colorspace-faq/
|
|
brightess = int(background_colour[0:2], 16) * 0.212671 + \
|
|
int(background_colour[2:4], 16) * 0.715160 + \
|
|
int(background_colour[4:6], 16) * 0.072169
|
|
if brightess > 128:
|
|
fg_colour = 'black'
|
|
else:
|
|
fg_colour = 'white'
|
|
return fg_colour
|
|
|
|
|
|
def C_(msg):
|
|
'''Translates and removes context from message'''
|
|
return _(msg).split('|', 1)[1]
|
|
|
|
|
|
def indent_xml(elem, level=0):
|
|
# in-place prettyprint formatter
|
|
# http://effbot.org/zone/element-lib.htm#prettyprint
|
|
i = "\n" + level*" "
|
|
if len(elem):
|
|
if not elem.text or not elem.text.strip():
|
|
elem.text = i + " "
|
|
for elem in elem:
|
|
indent_xml(elem, level+1)
|
|
if not elem.tail or not elem.tail.strip():
|
|
elem.tail = i
|
|
else:
|
|
if level and (not elem.tail or not elem.tail.strip()):
|
|
elem.tail = i
|
|
return elem
|
|
|
|
|
|
def xml_node_text(node):
|
|
if node is None or node.text is None:
|
|
return None
|
|
return force_str(node.text)
|
|
|
|
|
|
def preprocess_struct_time(obj):
|
|
if isinstance(obj, time.struct_time):
|
|
dt = datetime.datetime(*obj[:6])
|
|
if dt.hour == 0 and dt.minute == 0 and dt.second == 0:
|
|
return dt.date()
|
|
return dt
|
|
elif isinstance(obj, list):
|
|
return [preprocess_struct_time(x) for x in obj]
|
|
elif isinstance(obj, dict):
|
|
new_d = {}
|
|
for k, v in obj.items():
|
|
new_d[preprocess_struct_time(k)] = preprocess_struct_time(v)
|
|
return new_d
|
|
return obj
|
|
|
|
|
|
class JSONEncoder(json.JSONEncoder):
|
|
def encode(self, obj):
|
|
return super(JSONEncoder, self).encode(preprocess_struct_time(obj))
|
|
|
|
def default(self, obj):
|
|
# make sure time.struct_time are not received as they do have a
|
|
# default serializer in Python 3 (they are tuples) and should
|
|
# be converted to datetime objects beforehand.
|
|
assert not isinstance(obj, time.struct_time), 'time.struct_time should not be serialized'
|
|
|
|
if isinstance(obj, datetime.datetime):
|
|
return obj.isoformat()
|
|
|
|
if isinstance(obj, datetime.date):
|
|
return obj.strftime('%Y-%m-%d')
|
|
|
|
if isinstance(obj, decimal.Decimal):
|
|
return str(obj)
|
|
|
|
if isinstance(obj, bytes):
|
|
return obj.decode('ascii')
|
|
|
|
if hasattr(obj, 'get_json_value'):
|
|
return obj.get_json_value()
|
|
|
|
if hasattr(obj, 'base_filename'):
|
|
return {
|
|
'filename': obj.base_filename,
|
|
'content_type': obj.content_type or 'application/octet-stream',
|
|
'content': base64.b64encode(obj.get_content()),
|
|
}
|
|
|
|
# Let the base class default method raise the TypeError
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
def json_encode_helper(d, charset):
|
|
'''Encode a JSON structure into local charset'''
|
|
# since Python 3 strings are unicode, no need to convert anything
|
|
return d
|
|
|
|
|
|
def json_loads(value, charset=None):
|
|
return json.loads(force_text(value))
|
|
|
|
|
|
def json_response(data):
|
|
get_response().set_content_type('application/json')
|
|
if get_request().get_environ('HTTP_ORIGIN'):
|
|
get_response().set_header('Access-Control-Allow-Origin',
|
|
get_request().get_environ('HTTP_ORIGIN'))
|
|
get_response().set_header('Access-Control-Allow-Credentials', 'true')
|
|
get_response().set_header('Access-Control-Allow-Headers', 'x-requested-with')
|
|
json_str = json.dumps(data)
|
|
for variable in ('jsonpCallback', 'callback'):
|
|
if variable in get_request().form:
|
|
get_response().set_content_type('application/javascript')
|
|
json_str = '%s(%s);' % (get_request().form[variable], json_str)
|
|
break
|
|
return json_str
|
|
|
|
|
|
def parse_isotime(s):
|
|
s = s.replace('+00:00Z', 'Z') # clean lemonldap dates with both timezone and Z
|
|
t = time.strptime(s, '%Y-%m-%dT%H:%M:%SZ')
|
|
return calendar.timegm(t)
|
|
|
|
|
|
def file_digest(content, chunk_size=100000):
|
|
digest = hashlib.sha256()
|
|
content.seek(0)
|
|
|
|
def read_chunk():
|
|
return content.read(chunk_size)
|
|
for chunk in iter(read_chunk, b''):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def can_thumbnail(content_type):
|
|
if content_type == 'application/pdf':
|
|
return bool(HAS_PDFTOPPM and Image)
|
|
if content_type and content_type.startswith('image/'):
|
|
return bool(Image is not None)
|
|
return False
|
|
|
|
|
|
def get_thumbnail(filepath, content_type=None):
|
|
if not can_thumbnail(content_type or ''):
|
|
raise ThumbnailError()
|
|
|
|
if content_type == 'application/pdf':
|
|
try:
|
|
fp = BytesIO(subprocess.check_output(
|
|
['pdftoppm', '-png', '-scale-to-x', '500', '-scale-to-y', '-1', filepath]))
|
|
except subprocess.CalledProcessError:
|
|
raise ThumbnailError()
|
|
else:
|
|
fp = open(filepath, 'rb')
|
|
|
|
try:
|
|
image = Image.open(fp)
|
|
try:
|
|
exif = image._getexif()
|
|
except:
|
|
exif = None
|
|
|
|
if exif:
|
|
# orientation code from sorl.thumbnail (engines/pil_engine.py)
|
|
orientation = exif.get(EXIF_ORIENTATION)
|
|
|
|
if orientation == 2:
|
|
image = image.transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif orientation == 3:
|
|
image = image.rotate(180)
|
|
elif orientation == 4:
|
|
image = image.transpose(Image.FLIP_TOP_BOTTOM)
|
|
elif orientation == 5:
|
|
image = image.rotate(-90, expand=1).transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif orientation == 6:
|
|
image = image.rotate(-90, expand=1)
|
|
elif orientation == 7:
|
|
image = image.rotate(90, expand=1).transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif orientation == 8:
|
|
image = image.rotate(90, expand=1)
|
|
|
|
image.thumbnail((500, 300))
|
|
image_thumb_fp = BytesIO()
|
|
image.save(image_thumb_fp, "PNG")
|
|
except IOError:
|
|
# failed to create thumbnail.
|
|
raise ThumbnailError()
|
|
return image_thumb_fp.getvalue()
|
|
|
|
|
|
def normalize_geolocation(lat_lon):
|
|
'''Fit lat into -90/90 and lon into -180/180'''
|
|
def wrap(x, mini, maxi):
|
|
diff = maxi - mini
|
|
return ((x - mini) % diff + diff) % diff + mini
|
|
lat = decimal.Decimal(lat_lon['lat'])
|
|
lon = decimal.Decimal(lat_lon['lon'])
|
|
lat = wrap(lat, decimal.Decimal('-90.0'), decimal.Decimal('90.0'))
|
|
lon = wrap(lon, decimal.Decimal('-180.0'), decimal.Decimal('180.0'))
|
|
return {'lat': float(lat), 'lon': float(lon)}
|
|
|
|
|
|
def html2text(text):
|
|
if isinstance(text, (htmltext, str)):
|
|
text = force_text(str(text), get_publisher().site_charset)
|
|
return site_encode(HTMLParser().unescape(strip_tags(text)))
|
|
|
|
|
|
def validate_luhn(string_value, length=None):
|
|
'''Verify Luhn checksum on a string representing a number'''
|
|
if not string_value:
|
|
return False
|
|
if length is not None and len(string_value) != length:
|
|
return False
|
|
if not is_ascii_digit(string_value):
|
|
return False
|
|
|
|
# take all digits counting from the right, double value for digits pair
|
|
# index (counting from 1), if double has 2 digits take their sum
|
|
checksum = 0
|
|
for i, x in enumerate(reversed(string_value)):
|
|
if i % 2 == 0:
|
|
checksum += int(x)
|
|
else:
|
|
checksum += sum(int(y) for y in str(2 * int(x)))
|
|
if checksum % 10 != 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_ascii_digit(string_value):
|
|
return string_value and all((x in '0123456789' for x in string_value))
|
|
|
|
|
|
def validate_siren(string_value):
|
|
return validate_luhn(string_value, length=9)
|
|
|
|
|
|
def validate_siret(string_value):
|
|
# special case : La Poste
|
|
if not is_ascii_digit(string_value):
|
|
return False
|
|
if (string_value.startswith('356000000')
|
|
and len(string_value) == 14
|
|
and sum(int(x) for x in string_value) % 5 == 0):
|
|
return True
|
|
return validate_luhn(string_value, length=14)
|
|
|
|
|
|
def validate_nir(string_value):
|
|
'''https://fr.wikipedia.org/wiki/Num%C3%A9ro_de_s%C3%A9curit%C3%A9_sociale_en_France'''
|
|
if not string_value:
|
|
return False
|
|
if len(string_value) != 15:
|
|
return False
|
|
if string_value[0] == '0': # sex
|
|
return False
|
|
if string_value[7:10] == '000': # municipality
|
|
return False
|
|
if string_value[10:13] == '000': # order
|
|
return False
|
|
dept = string_value[5:7]
|
|
if dept == '2A':
|
|
string_value = string_value.replace('2A', '19', 1)
|
|
elif dept == '2B':
|
|
string_value = string_value.replace('2B', '18', 1)
|
|
if not is_ascii_digit(string_value):
|
|
return False
|
|
month = int(string_value[3:5])
|
|
if month < 50 and month not in list(range(1, 13)) + [20] + list(range(30, 43)):
|
|
return False
|
|
nir_key = string_value[13:]
|
|
return int(nir_key) == 97 - int(string_value[:13]) % 97
|
|
|
|
|
|
IBAN_LENGTH = {
|
|
# from https://www.iban.com/structure
|
|
'AD': 24, 'AE': 23, 'AL': 28, 'AT': 20, 'AZ': 28, 'BA': 20,
|
|
'BE': 16, 'BG': 22, 'BH': 22, 'BR': 29, 'BY': 28, 'CH': 21,
|
|
'CR': 22, 'CY': 28, 'CZ': 24, 'DE': 22, 'DK': 18, 'DO': 28,
|
|
'EE': 20, 'EG': 29, 'ES': 24, 'FI': 18, 'FO': 18, 'FR': 27,
|
|
'GB': 22, 'GE': 22, 'GI': 23, 'GL': 18, 'GR': 27, 'GT': 28,
|
|
'HR': 21, 'HU': 28, 'IE': 22, 'IL': 23, 'IQ': 23, 'IS': 26,
|
|
'IT': 27, 'JO': 30, 'KW': 30, 'KZ': 20, 'LB': 28, 'LC': 32,
|
|
'LI': 21, 'LT': 20, 'LU': 20, 'LV': 21, 'MC': 27, 'MD': 24,
|
|
'ME': 22, 'MK': 19, 'MR': 27, 'MT': 31, 'MU': 30, 'NL': 18,
|
|
'NO': 15, 'PK': 24, 'PL': 28, 'PS': 29, 'PT': 25, 'QA': 29,
|
|
'RO': 24, 'RS': 22, 'SA': 24, 'SC': 31, 'SE': 24, 'SI': 19,
|
|
'SK': 24, 'SM': 27, 'ST': 25, 'SV': 28, 'TL': 23, 'TN': 24,
|
|
'TR': 26, 'UA': 29, 'VA': 22, 'VG': 24, 'XK': 20,
|
|
# FR includes:
|
|
'GF': 27, 'GP': 27, 'MQ': 27, 'RE': 27, 'PF': 27, 'TF': 27,
|
|
'YT': 27, 'NC': 27, 'BL': 27, 'MF': 27, 'PM': 27, 'WF': 27,
|
|
# GB includes:
|
|
'IM': 22, 'GG': 22, 'JE': 22,
|
|
# FI includes:
|
|
'AX': 18,
|
|
# ES includes:
|
|
'IC': 24, 'EA': 24,
|
|
}
|
|
|
|
def validate_iban(string_value):
|
|
'''https://fr.wikipedia.org/wiki/International_Bank_Account_Number'''
|
|
if not string_value:
|
|
return False
|
|
country_code = string_value[:2]
|
|
iban_key = string_value[2:4]
|
|
bban = string_value[4:]
|
|
if not (country_code.isalpha() and country_code.isupper()):
|
|
return False
|
|
if IBAN_LENGTH.get(country_code) and len(string_value) != IBAN_LENGTH[country_code]:
|
|
return False
|
|
if not is_ascii_digit(iban_key):
|
|
return False
|
|
if not bban or is_ascii_digit(bban) and int(bban) == 0:
|
|
# bban is empty or a list of 0
|
|
return False
|
|
dummy_iban = bban + country_code + '00'
|
|
dummy_iban_converted = ''
|
|
for car in dummy_iban:
|
|
if car >= 'A' and car <= 'Z':
|
|
dummy_iban_converted += str(ord(car) - ord('A') + 10)
|
|
else:
|
|
dummy_iban_converted += car
|
|
if not is_ascii_digit(dummy_iban_converted):
|
|
return False
|
|
return int(iban_key) == 98 - int(dummy_iban_converted) % 97
|
|
|
|
|
|
def get_int_or_400(value):
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
raise RequestError()
|
|
|
|
|
|
def get_order_by_or_400(value):
|
|
if value is None:
|
|
return None
|
|
if not re.match(r'-?[a-z0-9_-]+$', value):
|
|
raise RequestError()
|
|
return value
|
|
|
|
|
|
class QLookupRedirect:
|
|
'''
|
|
Class to use to interrupt a _q_lookup method and redirect.
|
|
'''
|
|
def __init__(self, url):
|
|
self.url = url
|
|
|
|
def _q_traverse(self, path):
|
|
return redirect(self.url)
|