wcs/wcs/qommon/misc.py

652 lines
22 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
import decimal
import calendar
import re
import os
import time
import base64
import json
import subprocess
import tempfile
import unicodedata
import urllib
import hashlib
import requests
try:
from PIL import Image
except ImportError:
Image = None
from django.conf import settings
from django.utils import datetime_safe
from django.utils.html import strip_tags
from django.template import engines, TemplateSyntaxError, VariableDoesNotExist
from django.utils.six.moves.html_parser import HTMLParser
from django.utils.six.moves.urllib.parse import quote, urlencode
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_publisher, get_response, get_request
from quixote.html import htmltext
from qommon import _
from qommon import get_cfg, get_logger, ezt
from qommon.errors import ConnectionError
from qommon.template import Template
from django.utils.six import StringIO
try:
subprocess.check_call(['which', 'gm'], stdout=open('/dev/null', 'w'))
HAS_GM = True
except subprocess.CalledProcessError:
HAS_GM = False
EXIF_ORIENTATION = 0x0112
class ThumbnailError(Exception):
pass
def get_abs_path(s):
if not s:
return s
if s[0] == '/':
return s
return os.path.join(get_publisher().app_dir, s)
def get_lasso_server():
if not get_cfg('sp'):
return None
import lasso
server = lasso.Server(
get_abs_path(get_cfg('sp')['saml2_metadata']),
get_abs_path(get_cfg('sp')['privatekey']),
None, None)
server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
# Set encryption private key
encryption_privatekey = get_abs_path(get_cfg('sp').get('encryption_privatekey'))
if encryption_privatekey and os.path.exists(encryption_privatekey):
try:
server.setEncryptionPrivateKey(encryption_privatekey)
except lasso.Error as error:
get_logger().warn('Failed to set encryption private key')
for klp, idp in sorted(get_cfg('idp', {}).items(), key=lambda k: k[0]):
try:
server.addProvider(
lasso.PROVIDER_ROLE_IDP,
get_abs_path(idp['metadata']),
get_abs_path(idp.get('publickey')),
get_abs_path(idp.get('cacertchain')))
except lasso.Error as error:
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_PROTOCOL_MISMATCH:
continue
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_FAILED:
continue
raise
if hasattr(lasso, 'ENCRYPTION_SYM_KEY_TYPE_DEFAULT'):
encryption_mode = lasso.ENCRYPTION_MODE_NONE
if idp.get('encrypt_nameid', False):
encryption_mode |= lasso.ENCRYPTION_MODE_NAMEID
provider_t = get_provider(klp)
provider = server.getProvider(provider_t.providerId)
if provider is not None:
provider.setEncryptionMode(encryption_mode)
return server
def get_provider_label(provider):
if not provider:
return None
if not hasattr(provider, str('getOrganization')):
return provider.providerId
organization = provider.getOrganization()
if not organization:
return provider.providerId
name = re.findall("<OrganizationDisplayName.*>(.*?)</OrganizationDisplayName>", organization)
if not name:
name = re.findall("<OrganizationName.*>(.*?)</OrganizationName>", organization)
if not name:
return provider.providerId
return htmltext(name[0].decode('utf8').encode(get_publisher().site_charset))
def get_provider(provider_key):
lp = get_cfg('idp', {}).get(provider_key)
if not lp:
raise KeyError()
import lasso
publickey_fn = None
if lp.get('publickey'):
publickey_fn = get_abs_path(lp['publickey'])
# cacertchain (not really necessary to get provider label)
try:
provider = lasso.Provider(lasso.PROVIDER_ROLE_IDP,
get_abs_path(lp['metadata']), publickey_fn, None)
except lasso.Error:
raise KeyError()
return provider
def get_provider_key(provider_id):
return provider_id.replace('://', '-').replace('/', '-').replace('?', '-').replace(':', '-')
def simplify(s, space='-'):
if s is None:
return ''
if not isinstance(s, unicode):
if get_publisher() and get_publisher().site_charset:
s = unicode('%s' % s, get_publisher().site_charset, 'ignore')
else:
s = unicode('%s' % s, 'iso-8859-1', 'ignore')
s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore')
s = re.sub(r'[^\w\s\'%s]' % space, '', s).strip().lower()
s = re.sub(r'[\s\'%s]+' % space, space, s)
return s
def get_datetime_language():
lang = get_cfg('language', {}).get('language', None)
if lang is None:
if os.environ.get('LC_TIME'):
lang = os.environ.get('LC_TIME')[:2]
elif os.environ.get('LC_ALL'):
lang = os.environ.get('LC_ALL')[:2]
return lang
def strftime(fmt, dt):
if not dt:
return ''
if not isinstance(dt, datetime.datetime):
if isinstance(dt, datetime.date):
dt = datetime.datetime(dt.year, dt.month, dt.day)
else:
# consider it a 9 elements tuple
dt = datetime.datetime(*dt[:6])
else:
# un-lazyfication: get real datetime objet,
# in case of a lazy datetime objet
dt = dt.replace()
return datetime_safe.strftime(dt, fmt)
def localstrftime(t):
if not t:
return ''
return strftime(datetime_format(), t)
DATE_FORMATS = {
'C': ['%Y-%m-%d', '%y-%m-%d'],
'fr': ['%d/%m/%Y', '%d/%m/%y'],
}
DATETIME_FORMATS = {
'C': ['%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%SZ',
'%y-%m-%d %H:%M', '%y-%m-%d %H:%M:%S'],
'fr': ['%d/%m/%Y %H:%M', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y %Hh%M',
'%d/%m/%y %H:%M', '%d/%m/%y %H:%M:%S', '%d/%m/%y %Hh%M'],
}
def datetime_format():
lang = get_datetime_language()
if lang not in DATETIME_FORMATS:
lang = 'C'
return DATETIME_FORMATS[lang][0]
def date_format():
lang = get_datetime_language()
if lang not in DATE_FORMATS:
lang = 'C'
return DATE_FORMATS[lang][0]
def get_as_datetime(s):
formats = [datetime_format(), date_format()] # prefer current locale
for value in DATETIME_FORMATS.values():
formats.extend(value)
for value in DATE_FORMATS.values():
formats.extend(value)
for format_string in formats:
try:
return datetime.datetime.strptime(s, format_string)
except ValueError:
pass
raise ValueError()
def site_encode(s):
if s is None:
return None
if isinstance(s, str):
return s
if not isinstance(s, unicode):
s = unicode(s)
return s.encode(get_publisher().site_charset)
def ellipsize(s, length = 30):
if type(s) is not unicode:
s = unicode(s, get_publisher().site_charset, 'replace')
if not s or len(s) < length:
return s.encode(get_publisher().site_charset)
return s[:length-5].encode(get_publisher().site_charset) + ' (...)'
def get_month_name(month):
month_names = [_('January'), _('February'), _('March'), _('April'),
_('May'), _('June'), _('July'), _('August'),
_('September'), _('October'), _('November'), _('December') ]
return month_names[month-1]
def format_time(datetime, formatstring, gmtime = False):
if not datetime:
return '?'
if type(datetime) in (int, float):
if gmtime:
datetime = time.gmtime(datetime)
else:
datetime = time.localtime(datetime)
if len(datetime) == 2:
year, month = datetime
weekday = None
elif len(datetime) == 3:
year, month, day = datetime
weekday = None
else:
year, month, day, hour, minute, second, weekday = datetime[:7]
weekday_names = [_('Monday'), _('Tuesday'), _('Wednesday'),
_('Thursday'), _('Friday'), _('Saturday'), _('Sunday')]
if weekday is not None:
weekday_name = weekday_names[weekday]
lower_weekday_name = weekday_name.lower()
abbr_weekday_name = weekday_name[:3]
month_name = get_month_name(month)
lower_month_name = month_name.lower()
abbr_month_name = month_name[:3]
return formatstring % locals()
def _http_request(url, method='GET', body=None, headers={}, cert_file=None, timeout=None,
raise_on_http_errors=False):
get_publisher().reload_cfg()
if url.startswith('http://'):
hostname, query = urllib.splithost(url[5:])
elif url.startswith('https://'):
hostname, query = urllib.splithost(url[6:])
else:
raise ConnectionError('invalid scheme in URL %s' % url)
auth = None
if '@' in hostname:
authenticator, hostname = hostname.split('@')
if ':' in authenticator:
username, password = authenticator.split(':', 1)
else:
username = authenticator
password = ''
auth = (username, password)
timeout = timeout or settings.REQUESTS_TIMEOUT
try:
response = requests.request(method, url, headers=headers, data=body,
timeout=timeout, cert=cert_file, proxies=settings.REQUESTS_PROXIES)
except requests.Timeout:
raise ConnectionError('connection timed out while fetching the page')
except requests.RequestException as err:
raise ConnectionError('error in HTTP request to %s (%s)' % (hostname, err))
else:
data = response.content
status = response.status_code
auth_header = response.headers.get('WWW-Authenticate')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return response, status, data, auth_header
def urlopen(url, data=None):
response, status, data, auth_header = _http_request(
url, 'GET' if data is None else 'POST',
body=data,
raise_on_http_errors=True)
return StringIO(data)
def http_get_page(url, **kwargs):
return _http_request(url, **kwargs)
def http_patch_request(url, body=None, **kwargs):
return _http_request(url, 'PATCH', body, **kwargs)
def http_post_request(url, body=None, **kwargs):
return _http_request(url, 'POST', body, **kwargs)
def http_delete_request(url, **kwargs):
return _http_request(url, 'DELETE', **kwargs)
def get_variadic_url(url, variables, encode_query=True):
if not Template.is_template_string(url):
return url
# django template
if '{{' in url or '{%' in url:
try:
url = Template(url).render(variables)
p = urlparse.urlsplit(url)
scheme, netloc, path, query, fragment = (
p.scheme, p.netloc, p.path, p.query, p.fragment)
if path.startswith('//'):
# don't let double slash happen at the root of the URL, this
# happens when a template such as {{url}}/path is used (with
# {{url}} already ending with a slash).
path = path[1:]
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
except (TemplateSyntaxError, VariableDoesNotExist):
return url
# ezt template, try to be safe
def ezt_substitute(template, variables):
tmpl = ezt.Template()
tmpl.parse(template)
fd = StringIO()
tmpl.generate(fd, variables)
return fd.getvalue()
def partial_quote(string):
# unquote brackets, as there may be further processing that needs them
# intact.
return quote(string).replace('%5B', '[').replace('%5D', ']')
p = urlparse.urlsplit(url)
scheme, netloc, path, query, fragment = \
p.scheme, p.netloc, p.path, p.query, p.fragment
if netloc and '[' in netloc:
netloc = ezt_substitute(netloc, variables)
if path and '[' in path:
if scheme == '' and netloc == '':
# this happened because the variable was set in the scheme
# (ex: http[https]://www.example.net) or because the value starts
# with a variable name (ex: [url]); in that situation we do not
# quote at all.
if path.count('//') == 1:
# there were no / in the original path (the two / comes from
# the scheme/netloc separation, this means there is no path)
before_path = ezt_substitute(path, variables)
p2 = urlparse.urlsplit(before_path)
scheme, netloc, path = p2.scheme, p2.netloc, p2.path
else:
# there is a path, we need to get back to the original URL and
# split it on the last /, to isolate the path part.
lastslash = '/' if path.endswith('/') else ''
if '/' in path:
before_path, path = path.rsplit('/', 1)
else:
before_path, path = path, ''
before_path = ezt_substitute(before_path, variables)
p2 = urlparse.urlsplit(before_path)
scheme, netloc = p2.scheme, p2.netloc
if p2.path:
if not path:
path, query2 = p2.path + lastslash, p2.query
else:
path, query2 = p2.path + '/' + path, p2.query
if query and query2:
query += '&' + query2
else:
query = query or query2
if path:
path = partial_quote(ezt_substitute(path, variables))
if not path:
path = '/'
if path.startswith('//'):
path = path[1:]
if fragment and '[' in fragment:
fragment = partial_quote(ezt_substitute(fragment, variables))
if query and '[' in query:
p_qs = urlparse.parse_qsl(query)
if len(p_qs) == 0:
# this happened because the query string has no key/values,
# probably because it's a single substitution variable (ex:
# http://www.example.net/foobar?[query])
query = ezt_substitute(query, variables)
else:
query = []
for k, v in p_qs:
if '[' in k:
k = ezt_substitute(k, variables)
if '[' in v:
v = ezt_substitute(v, variables)
query.append((k, v))
if encode_query:
query = urlencode(query)
else:
query = '&'.join('%s=%s' % (k,v) for (k,v) in query)
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
def get_foreground_colour(background_colour):
"""Calculates the luminance of the given colour (six hexadecimal digits)
and returns an appropriate foreground colour."""
# luminance coefficients taken from section C-9 from
# http://www.faqs.org/faqs/graphics/colorspace-faq/
brightess = int(background_colour[0:2], 16) * 0.212671 + \
int(background_colour[2:4], 16) * 0.715160 + \
int(background_colour[4:6], 16) * 0.072169
if brightess > 128:
fg_colour = 'black'
else:
fg_colour = 'white'
return fg_colour
def C_(msg):
'''Translates and removes context from message'''
return _(msg).split('|', 1)[1]
def indent_xml(elem, level=0):
# in-place prettyprint formatter
# http://effbot.org/zone/element-lib.htm#prettyprint
i = "\n" + level*" "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
for elem in elem:
indent_xml(elem, level+1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
return elem
class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, time.struct_time):
return datetime.datetime.utcfromtimestamp(time.mktime(obj)).isoformat() + 'Z'
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, datetime.date):
return obj.strftime('%Y-%m-%d')
if isinstance(obj, decimal.Decimal):
return str(obj)
if hasattr(obj, 'base_filename'):
return {
'filename': obj.base_filename,
'content_type': obj.content_type or 'application/octet-stream',
'content': base64.b64encode(obj.get_content()),
}
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def json_encode_helper(d, charset):
'''Encode a JSON structure into local charset'''
if isinstance(d, unicode):
return d.encode(charset)
elif isinstance(d, list):
return [json_encode_helper(e, charset) for e in d]
elif isinstance(d, dict):
new_d = {}
for k, v in d.iteritems():
new_d[json_encode_helper(k, charset)] = json_encode_helper(v, charset)
return new_d
else:
return d
def json_loads(value, charset=None):
charset = (get_publisher() and get_publisher().site_charset) or 'utf-8'
return json_encode_helper(json.loads(value), charset)
def can_decorate_as_pdf():
return os.path.exists('/usr/bin/phantomjs')
def decorate_as_pdf(content):
from qommon import template
html_page = get_publisher().render_response(content)
html_page = html_page.replace('<head>',
'<head><base href="%s"><meta charset="%s">' %
(get_publisher().get_frontoffice_url(),
get_publisher().site_charset))
tmpfile = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
tmpfile.write(html_page)
tmpfile.close()
phantomjs_script = os.path.join(get_publisher().DATA_DIR, 'print-html-as-pdf.js')
subprocess.check_call(['phantomjs', '--ssl-protocol=any', phantomjs_script, tmpfile.name])
pdf_fd = open(tmpfile.name + '.pdf')
pdf_content = pdf_fd.read()
pdf_fd.close()
os.unlink(tmpfile.name + '.pdf')
os.unlink(tmpfile.name)
return pdf_content
def json_response(data):
get_response().set_content_type('application/json')
if get_request().get_environ('HTTP_ORIGIN'):
get_response().set_header('Access-Control-Allow-Origin',
get_request().get_environ('HTTP_ORIGIN'))
get_response().set_header('Access-Control-Allow-Credentials', 'true')
get_response().set_header('Access-Control-Allow-Headers', 'x-requested-with')
json_str = json.dumps(data)
for variable in ('jsonpCallback', 'callback'):
if variable in get_request().form:
get_response().set_content_type('application/javascript')
json_str = '%s(%s);' % (get_request().form[variable], json_str)
break
return json_str
def parse_isotime(s):
t = time.strptime(s, '%Y-%m-%dT%H:%M:%SZ')
return calendar.timegm(t)
def file_digest(content, chunk_size=100000):
digest = hashlib.sha256()
content.seek(0)
def read_chunk():
return content.read(chunk_size)
for chunk in iter(read_chunk, ''):
digest.update(chunk)
return digest.hexdigest()
def can_thumbnail(content_type):
if content_type == 'application/pdf':
return bool(HAS_GM and Image)
if content_type and content_type.startswith('image/'):
return bool(Image is not None)
return False
def get_thumbnail(filepath, content_type=None):
if not can_thumbnail(content_type or ''):
raise ThumbnailError()
if content_type == 'application/pdf':
try:
fp = StringIO(subprocess.check_output(
['gm', 'convert', '-geometry', '500x', 'pdf:%s' % filepath, 'png:-']))
except subprocess.CalledProcessError:
raise ThumbnailError()
else:
fp = open(filepath)
try:
image = Image.open(fp)
try:
exif = image._getexif()
except:
exif = None
if exif:
# orientation code from sorl.thumbnail (engines/pil_engine.py)
orientation = exif.get(EXIF_ORIENTATION)
if orientation == 2:
image = image.transpose(Image.FLIP_LEFT_RIGHT)
elif orientation == 3:
image = image.rotate(180)
elif orientation == 4:
image = image.transpose(Image.FLIP_TOP_BOTTOM)
elif orientation == 5:
image = image.rotate(-90, expand=1).transpose(Image.FLIP_LEFT_RIGHT)
elif orientation == 6:
image = image.rotate(-90, expand=1)
elif orientation == 7:
image = image.rotate(90, expand=1).transpose(Image.FLIP_LEFT_RIGHT)
elif orientation == 8:
image = image.rotate(90, expand=1)
image.thumbnail((500, 300))
image_thumb_fp = StringIO()
image.save(image_thumb_fp, "PNG")
except IOError:
# failed to create thumbnail.
raise ThumbnailError()
return image_thumb_fp.getvalue()
def normalize_geolocation(lat_lon):
'''Fit lat into -90/90 and lon into -180/180'''
def wrap(x, mini, maxi):
diff = maxi - mini
return ((x - mini) % diff + diff) % diff + mini
lat = decimal.Decimal(lat_lon['lat'])
lon = decimal.Decimal(lat_lon['lon'])
lat = wrap(lat, decimal.Decimal('-90.0'), decimal.Decimal('90.0'))
lon = wrap(lon, decimal.Decimal('-180.0'), decimal.Decimal('180.0'))
return {'lat': float(lat), 'lon': float(lon)}
def html2text(text):
if isinstance(text, (htmltext, str)):
text = unicode(str(text), get_publisher().site_charset)
return site_encode(HTMLParser().unescape(strip_tags(text)))