1033 lines
30 KiB
Python
1033 lines
30 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import calendar
|
|
import datetime
|
|
import decimal
|
|
import hashlib
|
|
import html
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
import unicodedata
|
|
import urllib.parse
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
Image = None
|
|
|
|
from django.conf import settings
|
|
from django.template import TemplateSyntaxError, VariableDoesNotExist
|
|
from django.utils import datetime_safe
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from django.utils.formats import localize
|
|
from django.utils.html import strip_tags
|
|
from django.utils.text import Truncator
|
|
from django.utils.timezone import is_aware, make_naive
|
|
from quixote import get_publisher, get_request, get_response, redirect
|
|
from quixote.errors import RequestError
|
|
from quixote.html import htmltext
|
|
|
|
from . import _, ezt, force_str, get_cfg, get_logger
|
|
from .errors import ConnectionError
|
|
from .template import Template
|
|
|
|
try:
|
|
subprocess.check_call(['which', 'pdftoppm'], stdout=subprocess.DEVNULL)
|
|
HAS_PDFTOPPM = True
|
|
except subprocess.CalledProcessError:
|
|
HAS_PDFTOPPM = False
|
|
|
|
|
|
EXIF_ORIENTATION = 0x0112
|
|
|
|
|
|
class ThumbnailError(Exception):
|
|
pass
|
|
|
|
|
|
def get_abs_path(s):
|
|
if not s:
|
|
return s
|
|
if s[0] == '/':
|
|
return s
|
|
return os.path.join(get_publisher().app_dir, s)
|
|
|
|
|
|
def get_lasso_server():
|
|
if not get_cfg('sp'):
|
|
return None
|
|
import lasso
|
|
|
|
server = lasso.Server(
|
|
get_abs_path(get_cfg('sp')['saml2_metadata']), get_abs_path(get_cfg('sp')['privatekey']), None, None
|
|
)
|
|
server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
|
|
|
|
# Set encryption private key
|
|
encryption_privatekey = get_abs_path(get_cfg('sp').get('encryption_privatekey'))
|
|
if encryption_privatekey and os.path.exists(encryption_privatekey):
|
|
try:
|
|
server.setEncryptionPrivateKey(encryption_privatekey)
|
|
except lasso.Error:
|
|
get_logger().warning('Failed to set encryption private key')
|
|
|
|
for klp, idp in sorted(get_cfg('idp', {}).items(), key=lambda k: k[0]):
|
|
try:
|
|
server.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
get_abs_path(idp['metadata']),
|
|
get_abs_path(idp.get('publickey')),
|
|
get_abs_path(idp.get('cacertchain')),
|
|
)
|
|
except lasso.Error as error:
|
|
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_PROTOCOL_MISMATCH:
|
|
continue
|
|
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_FAILED:
|
|
continue
|
|
raise
|
|
|
|
encryption_mode = lasso.ENCRYPTION_MODE_NONE
|
|
if idp.get('encrypt_nameid', False):
|
|
encryption_mode |= lasso.ENCRYPTION_MODE_NAMEID
|
|
provider_t = get_provider(klp)
|
|
provider = server.getProvider(provider_t.providerId)
|
|
if provider is not None:
|
|
provider.setEncryptionMode(encryption_mode)
|
|
|
|
return server
|
|
|
|
|
|
def get_provider_label(provider):
|
|
if not provider:
|
|
return None
|
|
if not hasattr(provider, str('getOrganization')):
|
|
return provider.providerId
|
|
|
|
organization = provider.getOrganization()
|
|
if not organization:
|
|
return provider.providerId
|
|
|
|
name = re.findall("<OrganizationDisplayName.*>(.*?)</OrganizationDisplayName>", organization)
|
|
if not name:
|
|
name = re.findall("<OrganizationName.*>(.*?)</OrganizationName>", organization)
|
|
if not name:
|
|
return provider.providerId
|
|
return htmltext(name[0].decode('utf8').encode(get_publisher().site_charset))
|
|
|
|
|
|
def get_provider(provider_key):
|
|
lp = get_cfg('idp', {}).get(provider_key)
|
|
if not lp:
|
|
raise KeyError()
|
|
|
|
import lasso
|
|
|
|
publickey_fn = None
|
|
if lp.get('publickey'):
|
|
publickey_fn = get_abs_path(lp['publickey'])
|
|
# cacertchain (not really necessary to get provider label)
|
|
|
|
try:
|
|
provider = lasso.Provider(lasso.PROVIDER_ROLE_IDP, get_abs_path(lp['metadata']), publickey_fn, None)
|
|
except lasso.Error:
|
|
raise KeyError()
|
|
|
|
return provider
|
|
|
|
|
|
def get_provider_key(provider_id):
|
|
return provider_id.replace('://', '-').replace('/', '-').replace('?', '-').replace(':', '-')
|
|
|
|
|
|
def simplify(s, space='-'):
|
|
if s is None:
|
|
return ''
|
|
if not isinstance(s, str):
|
|
if get_publisher() and get_publisher().site_charset:
|
|
s = force_text('%s' % s, get_publisher().site_charset, errors='ignore')
|
|
else:
|
|
s = force_text('%s' % s, 'iso-8859-1', errors='ignore')
|
|
s = force_text(unicodedata.normalize('NFKD', s).encode('ascii', 'ignore'))
|
|
s = re.sub(r'[^\w\s\'%s]' % space, '', s).strip().lower()
|
|
s = re.sub(r'[\s\'%s]+' % space, space, s)
|
|
return s
|
|
|
|
|
|
def get_datetime_language():
|
|
lang = get_cfg('language', {}).get('language', None)
|
|
if lang is None:
|
|
if os.environ.get('LC_TIME'):
|
|
lang = os.environ.get('LC_TIME')[:2]
|
|
elif os.environ.get('LC_ALL'):
|
|
lang = os.environ.get('LC_ALL')[:2]
|
|
return lang
|
|
|
|
|
|
def strftime(fmt, dt):
|
|
if not dt:
|
|
return ''
|
|
if not isinstance(dt, datetime.datetime):
|
|
if isinstance(dt, datetime.date):
|
|
dt = datetime.datetime(dt.year, dt.month, dt.day)
|
|
else:
|
|
# consider it a 9 elements tuple
|
|
dt = datetime.datetime(*dt[:6])
|
|
else:
|
|
# un-lazyfication: get real datetime objet,
|
|
# in case of a lazy datetime objet
|
|
dt = dt.replace()
|
|
return datetime_safe.strftime(dt, fmt)
|
|
|
|
|
|
def localstrftime(t):
|
|
if not t:
|
|
return ''
|
|
if isinstance(t, datetime.datetime) and is_aware(t):
|
|
t = make_naive(t)
|
|
return strftime(datetime_format(), t)
|
|
|
|
|
|
DATE_FORMATS = {
|
|
'C': ['%Y-%m-%d', '%y-%m-%d'],
|
|
'fr': ['%d/%m/%Y', '%d/%m/%y'],
|
|
}
|
|
|
|
DATETIME_FORMATS = {
|
|
'C': [
|
|
'%Y-%m-%d %H:%M',
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%Y-%m-%dT%H:%M:%S',
|
|
'%Y-%m-%dT%H:%M:%SZ',
|
|
'%y-%m-%d %H:%M',
|
|
'%y-%m-%d %H:%M:%S',
|
|
],
|
|
'fr': [
|
|
'%d/%m/%Y %H:%M',
|
|
'%d/%m/%Y %H:%M:%S',
|
|
'%d/%m/%Y %Hh%M',
|
|
'%d/%m/%y %H:%M',
|
|
'%d/%m/%y %H:%M:%S',
|
|
'%d/%m/%y %Hh%M',
|
|
],
|
|
}
|
|
|
|
|
|
def datetime_format():
|
|
lang = get_datetime_language()
|
|
if lang not in DATETIME_FORMATS:
|
|
lang = 'C'
|
|
return DATETIME_FORMATS[lang][0]
|
|
|
|
|
|
def date_format():
|
|
lang = get_datetime_language()
|
|
if lang not in DATE_FORMATS:
|
|
lang = 'C'
|
|
return DATE_FORMATS[lang][0]
|
|
|
|
|
|
def get_as_datetime(s):
|
|
formats = [datetime_format(), date_format()] # prefer current locale
|
|
for value in DATETIME_FORMATS.values():
|
|
formats.extend(value)
|
|
for value in DATE_FORMATS.values():
|
|
formats.extend(value)
|
|
exception = ValueError()
|
|
for format_string in formats:
|
|
try:
|
|
return datetime.datetime.strptime(s, format_string)
|
|
except ValueError as e:
|
|
exception = e
|
|
raise exception
|
|
|
|
|
|
def site_encode(s):
|
|
if s is None:
|
|
return None
|
|
if isinstance(s, str):
|
|
return s
|
|
if not isinstance(s, str):
|
|
s = force_text(s)
|
|
return s.encode(get_publisher().site_charset)
|
|
|
|
|
|
def ellipsize(s, length=30):
|
|
s = force_text(s, get_publisher().site_charset, errors='replace')
|
|
if s and len(s) > length:
|
|
if length > 3:
|
|
s = Truncator(s).chars(length, truncate='(…)')
|
|
else:
|
|
s = s[:length]
|
|
return force_str(s)
|
|
|
|
|
|
def get_month_name(month):
|
|
month_names = [
|
|
_('January'),
|
|
_('February'),
|
|
_('March'),
|
|
_('April'),
|
|
_('May'),
|
|
_('June'),
|
|
_('July'),
|
|
_('August'),
|
|
_('September'),
|
|
_('October'),
|
|
_('November'),
|
|
_('December'),
|
|
]
|
|
return month_names[month - 1]
|
|
|
|
|
|
def format_time(datetime, formatstring, gmtime=False):
|
|
if not datetime:
|
|
return '?'
|
|
if type(datetime) in (int, float):
|
|
if gmtime:
|
|
datetime = time.gmtime(datetime)
|
|
else:
|
|
datetime = time.localtime(datetime)
|
|
if len(datetime) == 2:
|
|
year, month = datetime
|
|
weekday = None
|
|
elif len(datetime) == 3:
|
|
year, month, day = datetime
|
|
weekday = None
|
|
else:
|
|
year, month, day, hour, minute, second, weekday = datetime[:7]
|
|
|
|
weekday_names = [
|
|
_('Monday'),
|
|
_('Tuesday'),
|
|
_('Wednesday'),
|
|
_('Thursday'),
|
|
_('Friday'),
|
|
_('Saturday'),
|
|
_('Sunday'),
|
|
]
|
|
|
|
if weekday is not None:
|
|
weekday_name = weekday_names[weekday]
|
|
lower_weekday_name = weekday_name.lower()
|
|
abbr_weekday_name = weekday_name[:3]
|
|
|
|
month_name = get_month_name(month)
|
|
lower_month_name = month_name.lower()
|
|
abbr_month_name = month_name[:3]
|
|
|
|
return formatstring % locals()
|
|
|
|
|
|
def _http_request(
|
|
url, method='GET', body=None, headers=None, cert_file=None, timeout=None, raise_on_http_errors=False
|
|
):
|
|
headers = headers or {}
|
|
get_publisher().reload_cfg()
|
|
|
|
splitted_url = urllib.parse.urlsplit(url)
|
|
if splitted_url.scheme not in ('http', 'https'):
|
|
raise ConnectionError('invalid scheme in URL %s' % url)
|
|
|
|
hostname = splitted_url.netloc
|
|
timeout = timeout or settings.REQUESTS_TIMEOUT
|
|
|
|
# re-use HTTP adapter to get connection pooling and keep-alive.
|
|
adapter = getattr(get_publisher(), '_http_adapter', None)
|
|
if adapter is None:
|
|
adapter = get_publisher()._http_adapter = HTTPAdapter()
|
|
|
|
session = requests.Session()
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
|
|
try:
|
|
response = session.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
data=body,
|
|
timeout=timeout,
|
|
cert=cert_file,
|
|
proxies=settings.REQUESTS_PROXIES,
|
|
)
|
|
except requests.Timeout:
|
|
raise ConnectionError('connection timed out while fetching the page')
|
|
except requests.RequestException as err:
|
|
raise ConnectionError('error in HTTP request to %s (%s)' % (hostname, err))
|
|
else:
|
|
data = response.content
|
|
status = response.status_code
|
|
auth_header = response.headers.get('WWW-Authenticate')
|
|
|
|
if raise_on_http_errors and not (200 <= status < 300):
|
|
raise ConnectionError('error in HTTP request to (status: %s)' % status)
|
|
|
|
return response, status, data, auth_header
|
|
|
|
|
|
def urlopen(url, data=None):
|
|
data = _http_request(url, 'GET' if data is None else 'POST', body=data, raise_on_http_errors=True)[2]
|
|
return io.BytesIO(data)
|
|
|
|
|
|
def http_get_page(url, **kwargs):
|
|
return _http_request(url, **kwargs)
|
|
|
|
|
|
def http_patch_request(url, body=None, **kwargs):
|
|
return _http_request(url, 'PATCH', body, **kwargs)
|
|
|
|
|
|
def http_post_request(url, body=None, **kwargs):
|
|
return _http_request(url, 'POST', body, **kwargs)
|
|
|
|
|
|
def http_delete_request(url, **kwargs):
|
|
return _http_request(url, 'DELETE', **kwargs)
|
|
|
|
|
|
def get_variadic_url(url, variables, encode_query=True):
|
|
if not Template.is_template_string(url):
|
|
return url
|
|
|
|
# django template
|
|
if '{{' in url or '{%' in url:
|
|
try:
|
|
url = Template(url).render(variables)
|
|
p = urllib.parse.urlsplit(url)
|
|
scheme, netloc, path, query, fragment = (p.scheme, p.netloc, p.path, p.query, p.fragment)
|
|
if path.startswith('//'):
|
|
# don't let double slash happen at the root of the URL, this
|
|
# happens when a template such as {{url}}/path is used (with
|
|
# {{url}} already ending with a slash).
|
|
path = path[1:]
|
|
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
|
except (TemplateSyntaxError, VariableDoesNotExist):
|
|
return url
|
|
|
|
# ezt template, try to be safe
|
|
def ezt_substitute(template, variables):
|
|
tmpl = ezt.Template()
|
|
tmpl.parse(template)
|
|
fd = io.StringIO()
|
|
tmpl.generate(fd, variables)
|
|
return fd.getvalue()
|
|
|
|
def partial_quote(string):
|
|
# unquote brackets, as there may be further processing that needs them
|
|
# intact.
|
|
return urllib.parse.quote(string).replace('%5B', '[').replace('%5D', ']')
|
|
|
|
p = urllib.parse.urlsplit(url)
|
|
scheme, netloc, path, query, fragment = p.scheme, p.netloc, p.path, p.query, p.fragment
|
|
if netloc and '[' in netloc:
|
|
netloc = ezt_substitute(netloc, variables)
|
|
if path and '[' in path:
|
|
if scheme == '' and netloc == '':
|
|
# this happened because the variable was set in the scheme
|
|
# (ex: http[https]://www.example.net) or because the value starts
|
|
# with a variable name (ex: [url]); in that situation we do not
|
|
# quote at all.
|
|
if path.count('//') == 1:
|
|
# there were no / in the original path (the two / comes from
|
|
# the scheme/netloc separation, this means there is no path)
|
|
before_path = ezt_substitute(path, variables)
|
|
p2 = urllib.parse.urlsplit(before_path)
|
|
scheme, netloc, path = p2.scheme, p2.netloc, p2.path
|
|
else:
|
|
# there is a path, we need to get back to the original URL and
|
|
# split it on the last /, to isolate the path part.
|
|
lastslash = '/' if path.endswith('/') else ''
|
|
if '/' in path:
|
|
before_path, path = path.rsplit('/', 1)
|
|
else:
|
|
before_path, path = path, ''
|
|
before_path = ezt_substitute(before_path, variables)
|
|
p2 = urllib.parse.urlsplit(before_path)
|
|
scheme, netloc = p2.scheme, p2.netloc
|
|
if p2.path:
|
|
if not path:
|
|
path, query2 = p2.path + lastslash, p2.query
|
|
else:
|
|
path, query2 = p2.path + '/' + path, p2.query
|
|
if query and query2:
|
|
query += '&' + query2
|
|
else:
|
|
query = query or query2
|
|
if path:
|
|
path = partial_quote(ezt_substitute(path, variables))
|
|
if not path:
|
|
path = '/'
|
|
if path.startswith('//'):
|
|
path = path[1:]
|
|
if fragment and '[' in fragment:
|
|
fragment = partial_quote(ezt_substitute(fragment, variables))
|
|
if query and '[' in query:
|
|
p_qs = urllib.parse.parse_qsl(query)
|
|
if len(p_qs) == 0:
|
|
# this happened because the query string has no key/values,
|
|
# probably because it's a single substitution variable (ex:
|
|
# http://www.example.net/foobar?[query])
|
|
query = ezt_substitute(query, variables)
|
|
else:
|
|
query = []
|
|
for k, v in p_qs:
|
|
if '[' in k:
|
|
k = ezt_substitute(k, variables)
|
|
if '[' in v:
|
|
v = ezt_substitute(v, variables)
|
|
query.append((k, v))
|
|
if encode_query:
|
|
query = urllib.parse.urlencode(query)
|
|
else:
|
|
query = '&'.join('%s=%s' % (k, v) for (k, v) in query)
|
|
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
|
|
|
|
|
def get_foreground_colour(background_colour):
|
|
"""Calculates the luminance of the given colour (six hexadecimal digits)
|
|
and returns an appropriate foreground colour."""
|
|
# luminance coefficients taken from section C-9 from
|
|
# http://www.faqs.org/faqs/graphics/colorspace-faq/
|
|
brightess = (
|
|
int(background_colour[0:2], 16) * 0.212671
|
|
+ int(background_colour[2:4], 16) * 0.715160
|
|
+ int(background_colour[4:6], 16) * 0.072169
|
|
)
|
|
if brightess > 128:
|
|
fg_colour = 'black'
|
|
else:
|
|
fg_colour = 'white'
|
|
return fg_colour
|
|
|
|
|
|
def C_(msg):
|
|
'''Translates and removes context from message'''
|
|
return _(msg).split('|', 1)[1]
|
|
|
|
|
|
def indent_xml(elem, level=0):
|
|
# in-place prettyprint formatter
|
|
# http://effbot.org/zone/element-lib.htm#prettyprint
|
|
i = "\n" + level * " "
|
|
if len(elem):
|
|
if not elem.text or not elem.text.strip():
|
|
elem.text = i + " "
|
|
for elem in elem:
|
|
indent_xml(elem, level + 1)
|
|
if not elem.tail or not elem.tail.strip():
|
|
elem.tail = i
|
|
else:
|
|
if level and (not elem.tail or not elem.tail.strip()):
|
|
elem.tail = i
|
|
return elem
|
|
|
|
|
|
def xml_node_text(node):
|
|
if node is None or node.text is None:
|
|
return None
|
|
return force_str(node.text)
|
|
|
|
|
|
def preprocess_struct_time(obj):
|
|
if isinstance(obj, time.struct_time):
|
|
dt = datetime.datetime(*obj[:6])
|
|
if dt.hour == 0 and dt.minute == 0 and dt.second == 0:
|
|
return dt.date()
|
|
return dt
|
|
elif isinstance(obj, list):
|
|
return [preprocess_struct_time(x) for x in obj]
|
|
elif isinstance(obj, dict):
|
|
new_d = {}
|
|
for k, v in obj.items():
|
|
new_d[preprocess_struct_time(k)] = preprocess_struct_time(v)
|
|
return new_d
|
|
return obj
|
|
|
|
|
|
class JSONEncoder(json.JSONEncoder):
|
|
def encode(self, obj):
|
|
return super().encode(preprocess_struct_time(obj))
|
|
|
|
def default(self, obj):
|
|
# make sure time.struct_time are not received as they do have a
|
|
# default serializer in Python 3 (they are tuples) and should
|
|
# be converted to datetime objects beforehand.
|
|
assert not isinstance(obj, time.struct_time), 'time.struct_time should not be serialized'
|
|
|
|
if isinstance(obj, datetime.datetime):
|
|
return obj.isoformat()
|
|
|
|
if isinstance(obj, datetime.date):
|
|
return obj.strftime('%Y-%m-%d')
|
|
|
|
if isinstance(obj, decimal.Decimal):
|
|
return localize(obj)
|
|
|
|
if isinstance(obj, bytes):
|
|
return obj.decode('ascii')
|
|
|
|
if hasattr(obj, 'get_json_value'):
|
|
return obj.get_json_value()
|
|
|
|
if hasattr(obj, 'base_filename'):
|
|
return {
|
|
'filename': obj.base_filename,
|
|
'content_type': obj.content_type or 'application/octet-stream',
|
|
'content': base64.b64encode(obj.get_content()),
|
|
}
|
|
|
|
if obj.__class__.__name__ == '__proxy__':
|
|
# lazy gettext
|
|
return str(obj)
|
|
|
|
# Let the base class default method raise the TypeError
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
def json_encode_helper(d, charset):
|
|
'''Encode a JSON structure into local charset'''
|
|
# since Python 3 strings are unicode, no need to convert anything
|
|
return d
|
|
|
|
|
|
def json_loads(value, charset=None):
|
|
return json.loads(force_text(value))
|
|
|
|
|
|
def json_response(data):
|
|
get_response().set_content_type('application/json')
|
|
if get_request().get_environ('HTTP_ORIGIN'):
|
|
get_response().set_header('Access-Control-Allow-Origin', get_request().get_environ('HTTP_ORIGIN'))
|
|
get_response().set_header('Access-Control-Allow-Credentials', 'true')
|
|
get_response().set_header('Access-Control-Allow-Headers', 'x-requested-with')
|
|
json_str = json.dumps(data, cls=JSONEncoder)
|
|
for variable in ('jsonpCallback', 'callback'):
|
|
if variable in get_request().form:
|
|
get_response().set_content_type('application/javascript')
|
|
json_str = '%s(%s);' % (get_request().form[variable], json_str)
|
|
break
|
|
return json_str
|
|
|
|
|
|
def parse_isotime(s):
|
|
s = s.replace('+00:00Z', 'Z') # clean lemonldap dates with both timezone and Z
|
|
t = time.strptime(s, '%Y-%m-%dT%H:%M:%SZ')
|
|
return calendar.timegm(t)
|
|
|
|
|
|
def file_digest(content, chunk_size=100000):
|
|
digest = hashlib.sha256()
|
|
content.seek(0)
|
|
|
|
def read_chunk():
|
|
return content.read(chunk_size)
|
|
|
|
for chunk in iter(read_chunk, b''):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def can_thumbnail(content_type):
|
|
if content_type == 'application/pdf':
|
|
return bool(HAS_PDFTOPPM and Image)
|
|
if content_type and content_type.startswith('image/'):
|
|
return bool(Image is not None)
|
|
return False
|
|
|
|
|
|
def get_thumbnail(filepath, content_type=None):
|
|
if not can_thumbnail(content_type or ''):
|
|
raise ThumbnailError()
|
|
|
|
# check if thumbnail already exists
|
|
thumbs_dir = os.path.join(get_publisher().app_dir, 'thumbs')
|
|
if not os.path.exists(thumbs_dir):
|
|
os.mkdir(thumbs_dir)
|
|
thumb_filepath = os.path.join(thumbs_dir, hashlib.sha256(force_bytes(filepath)).hexdigest())
|
|
if os.path.exists(thumb_filepath):
|
|
with open(thumb_filepath, 'rb') as f:
|
|
return f.read()
|
|
|
|
# generate thumbnail
|
|
if content_type == 'application/pdf':
|
|
try:
|
|
fp = io.BytesIO(
|
|
subprocess.check_output(
|
|
['pdftoppm', '-png', '-scale-to-x', '500', '-scale-to-y', '-1', filepath]
|
|
)
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
raise ThumbnailError()
|
|
else:
|
|
fp = open(filepath, 'rb') # pylint: disable=consider-using-with
|
|
try:
|
|
image = Image.open(fp)
|
|
try:
|
|
exif = image._getexif()
|
|
except Exception:
|
|
exif = None
|
|
|
|
if exif:
|
|
# orientation code from sorl.thumbnail (engines/pil_engine.py)
|
|
orientation = exif.get(EXIF_ORIENTATION)
|
|
|
|
if orientation == 2:
|
|
image = image.transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif orientation == 3:
|
|
image = image.rotate(180)
|
|
elif orientation == 4:
|
|
image = image.transpose(Image.FLIP_TOP_BOTTOM)
|
|
elif orientation == 5:
|
|
image = image.rotate(-90, expand=1).transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif orientation == 6:
|
|
image = image.rotate(-90, expand=1)
|
|
elif orientation == 7:
|
|
image = image.rotate(90, expand=1).transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif orientation == 8:
|
|
image = image.rotate(90, expand=1)
|
|
|
|
try:
|
|
image.thumbnail((500, 300))
|
|
except SyntaxError:
|
|
# PIL can raise syntax error on broken PNG files
|
|
# * File "PIL/PngImagePlugin.py", line 119, in read
|
|
# * raise SyntaxError("broken PNG file (chunk %s)" % repr(cid))
|
|
raise IOError
|
|
image_thumb_fp = io.BytesIO()
|
|
image.save(image_thumb_fp, "PNG")
|
|
except IOError:
|
|
# failed to create thumbnail.
|
|
raise ThumbnailError()
|
|
|
|
# store thumbnail
|
|
with open(thumb_filepath, 'wb') as f:
|
|
f.write(image_thumb_fp.getvalue())
|
|
|
|
return image_thumb_fp.getvalue()
|
|
|
|
|
|
def normalize_geolocation(lat_lon):
|
|
'''Fit lat into -90/90 and lon into -180/180'''
|
|
|
|
def wrap(x, mini, maxi):
|
|
diff = maxi - mini
|
|
return ((x - mini) % diff + diff) % diff + mini
|
|
|
|
lat = decimal.Decimal(lat_lon['lat'])
|
|
lon = decimal.Decimal(lat_lon['lon'])
|
|
lat = wrap(lat, decimal.Decimal('-90.0'), decimal.Decimal('90.0'))
|
|
lon = wrap(lon, decimal.Decimal('-180.0'), decimal.Decimal('180.0'))
|
|
return {'lat': float(lat), 'lon': float(lon)}
|
|
|
|
|
|
def html2text(text):
|
|
if isinstance(text, (htmltext, str)):
|
|
text = force_text(str(text), get_publisher().site_charset)
|
|
return site_encode(html.unescape(strip_tags(text)))
|
|
|
|
|
|
def validate_luhn(string_value, length=None):
|
|
'''Verify Luhn checksum on a string representing a number'''
|
|
if not string_value:
|
|
return False
|
|
if length is not None and len(string_value) != length:
|
|
return False
|
|
if not is_ascii_digit(string_value):
|
|
return False
|
|
|
|
# take all digits counting from the right, double value for digits pair
|
|
# index (counting from 1), if double has 2 digits take their sum
|
|
checksum = 0
|
|
for i, x in enumerate(reversed(string_value)):
|
|
if i % 2 == 0:
|
|
checksum += int(x)
|
|
else:
|
|
checksum += sum(int(y) for y in str(2 * int(x)))
|
|
if checksum % 10 != 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_ascii_digit(string_value):
|
|
return string_value and all((x in '0123456789' for x in string_value))
|
|
|
|
|
|
def validate_phone_fr(string_value):
|
|
if not re.match(r'^0[\d\.\s]+$', string_value):
|
|
# leading zero, then digits, dots, or spaces
|
|
return False
|
|
return len([x for x in string_value if is_ascii_digit(x)]) == 10
|
|
|
|
|
|
def validate_siren(string_value):
|
|
return validate_luhn(string_value, length=9)
|
|
|
|
|
|
def validate_siret(string_value):
|
|
# special case : La Poste
|
|
if not is_ascii_digit(string_value):
|
|
return False
|
|
if (
|
|
string_value.startswith('356000000')
|
|
and len(string_value) == 14
|
|
and sum(int(x) for x in string_value) % 5 == 0
|
|
):
|
|
return True
|
|
return validate_luhn(string_value, length=14)
|
|
|
|
|
|
def validate_nir(string_value):
|
|
'''https://fr.wikipedia.org/wiki/Num%C3%A9ro_de_s%C3%A9curit%C3%A9_sociale_en_France'''
|
|
if not string_value:
|
|
return False
|
|
if len(string_value) != 15:
|
|
return False
|
|
if string_value[0] == '0': # sex
|
|
return False
|
|
if string_value[7:10] == '000': # municipality
|
|
return False
|
|
if string_value[10:13] == '000': # order
|
|
return False
|
|
dept = string_value[5:7]
|
|
if dept == '2A':
|
|
string_value = string_value.replace('2A', '19', 1)
|
|
elif dept == '2B':
|
|
string_value = string_value.replace('2B', '18', 1)
|
|
if not is_ascii_digit(string_value):
|
|
return False
|
|
month = int(string_value[3:5])
|
|
if month < 50 and month not in list(range(1, 13)) + [20] + list(range(30, 43)):
|
|
return False
|
|
nir_key = string_value[13:]
|
|
return int(nir_key) == 97 - int(string_value[:13]) % 97
|
|
|
|
|
|
IBAN_LENGTH = {
|
|
# from https://www.iban.com/structure
|
|
'AD': 24,
|
|
'AE': 23,
|
|
'AL': 28,
|
|
'AT': 20,
|
|
'AZ': 28,
|
|
'BA': 20,
|
|
'BE': 16,
|
|
'BG': 22,
|
|
'BH': 22,
|
|
'BR': 29,
|
|
'BY': 28,
|
|
'CH': 21,
|
|
'CR': 22,
|
|
'CY': 28,
|
|
'CZ': 24,
|
|
'DE': 22,
|
|
'DK': 18,
|
|
'DO': 28,
|
|
'EE': 20,
|
|
'EG': 29,
|
|
'ES': 24,
|
|
'FI': 18,
|
|
'FO': 18,
|
|
'FR': 27,
|
|
'GB': 22,
|
|
'GE': 22,
|
|
'GI': 23,
|
|
'GL': 18,
|
|
'GR': 27,
|
|
'GT': 28,
|
|
'HR': 21,
|
|
'HU': 28,
|
|
'IE': 22,
|
|
'IL': 23,
|
|
'IQ': 23,
|
|
'IS': 26,
|
|
'IT': 27,
|
|
'JO': 30,
|
|
'KW': 30,
|
|
'KZ': 20,
|
|
'LB': 28,
|
|
'LC': 32,
|
|
'LI': 21,
|
|
'LT': 20,
|
|
'LU': 20,
|
|
'LV': 21,
|
|
'MC': 27,
|
|
'MD': 24,
|
|
'ME': 22,
|
|
'MK': 19,
|
|
'MR': 27,
|
|
'MT': 31,
|
|
'MU': 30,
|
|
'NL': 18,
|
|
'NO': 15,
|
|
'PK': 24,
|
|
'PL': 28,
|
|
'PS': 29,
|
|
'PT': 25,
|
|
'QA': 29,
|
|
'RO': 24,
|
|
'RS': 22,
|
|
'SA': 24,
|
|
'SC': 31,
|
|
'SE': 24,
|
|
'SI': 19,
|
|
'SK': 24,
|
|
'SM': 27,
|
|
'ST': 25,
|
|
'SV': 28,
|
|
'TL': 23,
|
|
'TN': 24,
|
|
'TR': 26,
|
|
'UA': 29,
|
|
'VA': 22,
|
|
'VG': 24,
|
|
'XK': 20,
|
|
# FR includes:
|
|
'GF': 27,
|
|
'GP': 27,
|
|
'MQ': 27,
|
|
'RE': 27,
|
|
'PF': 27,
|
|
'TF': 27,
|
|
'YT': 27,
|
|
'NC': 27,
|
|
'BL': 27,
|
|
'MF': 27,
|
|
'PM': 27,
|
|
'WF': 27,
|
|
# GB includes:
|
|
'IM': 22,
|
|
'GG': 22,
|
|
'JE': 22,
|
|
# FI includes:
|
|
'AX': 18,
|
|
# ES includes:
|
|
'IC': 24,
|
|
'EA': 24,
|
|
}
|
|
|
|
|
|
def validate_iban(string_value):
|
|
'''https://fr.wikipedia.org/wiki/International_Bank_Account_Number'''
|
|
if not string_value:
|
|
return False
|
|
country_code = string_value[:2]
|
|
iban_key = string_value[2:4]
|
|
bban = string_value[4:]
|
|
if not (country_code.isalpha() and country_code.isupper()):
|
|
return False
|
|
if IBAN_LENGTH.get(country_code) and len(string_value) != IBAN_LENGTH[country_code]:
|
|
return False
|
|
if not is_ascii_digit(iban_key):
|
|
return False
|
|
if not bban or is_ascii_digit(bban) and int(bban) == 0:
|
|
# bban is empty or a list of 0
|
|
return False
|
|
dummy_iban = bban + country_code + '00'
|
|
dummy_iban_converted = ''
|
|
for car in dummy_iban:
|
|
if 'A' <= car <= 'Z':
|
|
dummy_iban_converted += str(ord(car) - ord('A') + 10)
|
|
else:
|
|
dummy_iban_converted += car
|
|
if not is_ascii_digit(dummy_iban_converted):
|
|
return False
|
|
return int(iban_key) == 98 - int(dummy_iban_converted) % 97
|
|
|
|
|
|
def get_int_or_400(value):
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
raise RequestError()
|
|
|
|
|
|
def get_order_by_or_400(value):
|
|
if value is None:
|
|
return None
|
|
if not re.match(r'-?[a-z0-9_-]+$', value):
|
|
raise RequestError()
|
|
return value
|
|
|
|
|
|
class QLookupRedirect:
|
|
"""
|
|
Class to use to interrupt a _q_lookup method and redirect.
|
|
"""
|
|
|
|
def __init__(self, url):
|
|
self.url = url
|
|
|
|
def _q_traverse(self, path):
|
|
return redirect(self.url)
|
|
|
|
|
|
def get_document_types(current_document_type):
|
|
document_types = {
|
|
'_audio': {
|
|
'label': _('Sound files'),
|
|
'mimetypes': ['audio/*'],
|
|
},
|
|
'_video': {
|
|
'label': _('Video files'),
|
|
'mimetypes': ['video/*'],
|
|
},
|
|
'_image': {
|
|
'label': _('Image files'),
|
|
'mimetypes': ['image/*'],
|
|
},
|
|
}
|
|
# Local document types
|
|
document_types.update(get_cfg('filetypes', {}))
|
|
for key, document_type in document_types.items():
|
|
document_type['id'] = key
|
|
document_type['label'] = str(document_type['label'])
|
|
# add current file type if it does not exist anymore in the settings
|
|
cur_dt = current_document_type
|
|
if cur_dt and cur_dt['id'] not in document_types:
|
|
document_types[cur_dt['id']] = cur_dt
|
|
return document_types
|
|
|
|
|
|
def get_document_type_value_options(current_document_type):
|
|
document_types = get_document_types(current_document_type)
|
|
options = [({}, '---', '')]
|
|
options += [(doc_type, doc_type['label'], key) for key, doc_type in document_types.items()]
|
|
return options
|
|
|
|
|
|
def xml_response(obj, filename, content_type='text/xml'):
|
|
etree = obj.export_to_xml(include_id=True)
|
|
if hasattr(obj, 'get_admin_url'):
|
|
etree.attrib['url'] = obj.get_admin_url()
|
|
indent_xml(etree)
|
|
response = get_response()
|
|
response.set_content_type(content_type)
|
|
response.set_header('content-disposition', 'attachment; filename=%s' % filename)
|
|
return '<?xml version="1.0"?>\n' + ET.tostring(etree).decode('utf-8')
|