wcs/wcs/qommon/templatetags/qommon.py

989 lines
26 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# w.c.s. - web application for online forms
# Copyright (C) 2005-2017 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
import hashlib
import io
import math
import os
import random
import string
import unicodedata
import urllib.parse
from decimal import Decimal
from decimal import DivisionByZero as DecimalDivisionByZero
from decimal import InvalidOperation as DecimalInvalidOperation
import pyproj
try:
import qrcode
except ImportError:
qrcode = None
from pyproj import Geod
from quixote import get_publisher, get_response
try:
import langdetect
from langdetect.lang_detect_exception import LangDetectException
except ImportError:
langdetect = None
from django import template
from django.template import defaultfilters
from django.utils import dateparse
from django.utils.encoding import force_bytes, force_text
from django.utils.safestring import mark_safe
from django.utils.timezone import is_naive, make_aware
from wcs.qommon import calendar, evalutils, tokens, upload_storage
from wcs.qommon.admin.texts import TextsDirectory
from wcs.qommon.humantime import seconds2humanduration
from wcs.qommon.misc import validate_phone_fr
register = template.Library()
def unlazy(x):
return x.get_value() if hasattr(x, 'get_value') else x
@register.filter
def get(mapping, key):
mapping = unlazy(mapping)
key = unlazy(key)
if hasattr(mapping, 'get'):
return mapping.get(key)
if isinstance(mapping, (tuple, list)):
try:
key = int(key)
except (TypeError, ValueError):
pass
try:
return mapping[key]
except (TypeError, IndexError, KeyError):
return None
@register.filter
def getlist(mapping, key):
if mapping is None or not hasattr(mapping, 'getlist'):
return []
return mapping.getlist(key)
@register.filter
def getlistdict(mapping, keys):
if mapping is None or not hasattr(mapping, 'getlistdict'):
return []
parsed_keys = {}
for key in unlazy(keys).split(','):
if not key.strip():
continue
try:
name, new_name = key.split(':', 1)
except ValueError:
name = new_name = key
parsed_keys[name.strip()] = new_name.strip()
results = mapping.getlistdict(parsed_keys.keys())
return [{parsed_keys[k]: v for k, v in result.items()} for result in results]
@register.filter
def startswith(string, substring):
return string and force_text(string).startswith(force_text(substring))
@register.filter
def endswith(string, substring):
return string and force_text(string).endswith(force_text(substring))
@register.filter
def split(string, separator=' '):
if not string:
return []
return force_text(string).split(force_text(separator))
@register.filter
def strip(string, chars=None):
if not string:
return ''
if chars:
return force_text(string).strip(force_text(chars))
else:
return force_text(string).strip()
@register.filter
def removeprefix(string, prefix=None):
if not string:
return ''
value = force_text(string)
prefix = force_text(prefix)
if prefix and value.startswith(prefix):
return value[len(prefix) :]
return value
@register.filter
def removesuffix(string, suffix=None):
if not string:
return ''
value = force_text(string)
suffix = force_text(suffix)
if suffix and value.endswith(suffix):
return value[: -len(suffix)]
return value
@register.filter
def urljoin(base, path=None):
return urllib.parse.urljoin(base or '', path or '')
@register.filter
def unaccent(value):
value = unlazy(value)
if not value:
return ''
return force_text(unicodedata.normalize('NFKD', value).encode('ascii', 'ignore'))
@register.filter
def parse_date(date_string):
try:
return evalutils.make_date(date_string)
except ValueError:
pass
# fallback to Django function
try:
return dateparse.parse_date(date_string)
except (ValueError, TypeError):
return None
@register.filter(expects_localtime=True, is_safe=False)
def date(value, arg=None):
value = unlazy(value)
if arg is None:
value = parse_date(value)
if not value:
return ''
from wcs.variables import lazy_date
return lazy_date(parse_date(value))
if not isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
value = parse_datetime(value) or parse_date(value)
try:
return defaultfilters.date(value, arg=arg)
except NotImplementedError:
# Django raise it on bad date format
return ''
@register.filter
def parse_datetime(datetime_string):
try:
return evalutils.make_datetime(datetime_string)
except ValueError:
pass
# fallback to Django function
try:
return dateparse.parse_datetime(datetime_string)
except (ValueError, TypeError):
return None
@register.filter(name='datetime', expects_localtime=True, is_safe=False)
def datetime_(value, arg=None):
value = unlazy(value)
if arg is None:
value = parse_datetime(value)
if not value:
return ''
from wcs.variables import lazy_date
return lazy_date(parse_datetime(value))
if not isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
value = parse_datetime(value)
return defaultfilters.date(value, arg=arg)
@register.filter
def parse_time(time_string):
# if input is a datetime, extract its time
try:
dt = parse_datetime(time_string)
if dt:
return dt.time()
except (ValueError, TypeError):
pass
# fallback to Django function
try:
return dateparse.parse_time(time_string)
except (ValueError, TypeError):
return None
@register.filter(expects_localtime=True, is_safe=False)
def time(value, arg=None):
value = unlazy(value)
if arg is None:
parsed = parse_time(value)
return parsed if parsed is not None else '' # because bool(midnight) == False
if not isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
value = parse_time(value)
return defaultfilters.date(value, arg=arg)
def parse_decimal(value, do_raise=False):
value = unlazy(value)
if isinstance(value, bool):
# treat all booleans as 0 (contrary to Python behaviour where
# decimal(True) == 1).
value = 0
if isinstance(value, str):
# replace , by . for French users comfort
value = value.replace(',', '.')
try:
return Decimal(value).quantize(Decimal('1.000000')).normalize()
except (ArithmeticError, TypeError):
if do_raise:
raise
return Decimal(0)
@register.filter(is_safe=False)
def decimal(value, arg=None):
if not isinstance(value, Decimal):
value = parse_decimal(value)
if arg is None:
return value
arg = unlazy(arg)
return defaultfilters.floatformat(value, arg=arg)
@register.filter(is_safe=False)
def duration(value, arg='short'):
if arg not in ('short', 'long'):
return ''
# value is expected to be a timedelta or a number of seconds
value = unlazy(value)
arg = unlazy(arg)
if not isinstance(value, datetime.timedelta):
try:
value = datetime.timedelta(seconds=int(value) * 60)
except (TypeError, ValueError):
return ''
return seconds2humanduration(int(value.total_seconds()), short=bool(arg != 'long'))
@register.filter(expects_localtime=True, is_safe=False)
def add_days(value, arg):
if hasattr(value, 'timetuple'):
# extract real value in case of lazy object
value = value.timetuple()
value = parse_date(value) # consider only date, not hours
if not value:
return ''
from wcs.variables import lazy_date
arg = parse_decimal(arg)
if not arg:
return lazy_date(value)
result = value + datetime.timedelta(days=float(arg))
if hasattr(result, 'date'):
result = result.date()
return lazy_date(result)
@register.filter(expects_localtime=True, is_safe=False)
def add_hours(value, arg):
if hasattr(value, 'timetuple'):
# extract real value in case of lazy object
value = value.timetuple()
value = parse_datetime(value)
if not value:
return ''
from wcs.variables import lazy_date
arg = parse_decimal(arg)
if not arg:
return lazy_date(value)
return lazy_date(value + datetime.timedelta(hours=float(arg)))
@register.filter(expects_localtime=True, is_safe=False)
def add_minutes(value, arg):
if hasattr(value, 'timetuple'):
# extract real value in case of lazy object
value = value.timetuple()
value = parse_datetime(value)
if not value:
return ''
from wcs.variables import lazy_date
arg = parse_decimal(arg)
if not arg:
return lazy_date(value)
return lazy_date(value + datetime.timedelta(minutes=float(arg)))
@register.filter(expects_localtime=True, is_safe=False)
def age_in_days(value, now=None):
try:
return evalutils.age_in_days(value, now)
except ValueError:
return ''
@register.filter(expects_localtime=True, is_safe=False)
def age_in_hours(value, now=None):
# consider value and now as datetimes (and not dates)
if hasattr(value, 'timetuple'):
# extract real value in case of lazy object
value = value.timetuple()
value = parse_datetime(value)
if not value:
return ''
if now is not None:
if hasattr(now, 'timetuple'):
now = now.timetuple()
now = parse_datetime(now)
if not now:
return ''
else:
now = datetime.datetime.now()
return int((now - value).total_seconds() / 3600)
@register.filter(expects_localtime=True, is_safe=False)
def age_in_years(value, today=None):
try:
return evalutils.age_in_years_and_months(value, today)[0]
except ValueError:
return ''
@register.filter(expects_localtime=True, is_safe=False)
def age_in_months(value, today=None):
try:
years, months = evalutils.age_in_years_and_months(value, today)
except ValueError:
return ''
return years * 12 + months
@register.filter(expects_localtime=True)
def datetime_in_past(value):
value = parse_datetime(value)
if not value:
return False
if is_naive(value):
value = make_aware(value)
date_now = make_aware(datetime.datetime.now())
return value <= date_now
@register.filter(expects_localtime=True)
def is_working_day(value, saturday_is_a_working_day=False):
value = parse_date(value)
if not value:
return False
cal = calendar.get_calendar(saturday_is_a_working_day=saturday_is_a_working_day)
if not cal:
return False
return cal.is_working_day(value)
@register.filter(expects_localtime=True)
def is_working_day_with_saturday(value):
return is_working_day(value, saturday_is_a_working_day=True)
@register.filter(expects_localtime=True)
def add_working_days(value, arg, saturday_is_a_working_day=False):
value = parse_date(value)
if not value:
return ''
cal = calendar.get_calendar(saturday_is_a_working_day=saturday_is_a_working_day)
if not cal:
return ''
try:
return cal.add_working_days(value, int(arg))
except ValueError:
return ''
@register.filter(expects_localtime=True)
def add_working_days_with_saturday(value, arg):
return add_working_days(value, arg, saturday_is_a_working_day=True)
@register.filter(expects_localtime=True)
def adjust_to_working_day(value, saturday_is_a_working_day=False):
value = parse_date(value)
if not value:
return ''
cal = calendar.get_calendar(saturday_is_a_working_day=saturday_is_a_working_day)
if not cal:
return ''
if cal.is_working_day(value):
return value
# return next working day
return cal.add_working_days(value, 1)
@register.filter(expects_localtime=True)
def adjust_to_working_day_with_saturday(value):
return adjust_to_working_day(value, saturday_is_a_working_day=True)
@register.filter(expects_localtime=True)
def age_in_working_days(value, arg=None, saturday_is_a_working_day=False):
value = parse_date(value)
if not value:
return ''
if arg:
arg = parse_date(arg)
if not arg:
return ''
else:
arg = datetime.datetime.now()
cal = calendar.get_calendar(saturday_is_a_working_day=saturday_is_a_working_day)
if not cal:
return ''
return cal.get_working_days_delta(value, arg)
@register.filter(expects_localtime=True)
def age_in_working_days_with_saturday(value, arg=None):
return age_in_working_days(value, arg, saturday_is_a_working_day=True)
@register.filter(expects_localtime=True)
def adjust_to_week_monday(value):
value = parse_date(unlazy(value))
if not value:
return ''
return value - datetime.timedelta(days=value.weekday())
@register.filter(expects_localtime=True)
def iterate_days_until(value, until):
value = parse_date(unlazy(value))
until = parse_date(unlazy(until))
if not (value and until):
return
while value < until:
yield value
value = value + datetime.timedelta(days=1)
yield value
@register.simple_tag
def standard_text(text_id):
return mark_safe(TextsDirectory.get_html_text(str(text_id)))
@register.simple_tag
def add_javascript(js_id):
get_response().add_javascript([js_id])
return ''
@register.simple_tag(takes_context=True)
def action_button(context, action_id, label, delay=3, message=None, done_message=None):
formdata_id = context.get('form_number_raw')
formdef_urlname = context.get('form_slug')
formdef_type = context.get('form_type')
if not (formdef_urlname and formdata_id):
return ''
token = tokens.Token(expiration_delay=delay * 86400, size=64)
token.type = 'action'
token.context = {
'form_slug': formdef_urlname,
'form_type': formdef_type,
'form_number_raw': formdata_id,
'action_id': action_id,
'label': label,
'message': message,
'done_message': done_message,
}
token.store()
return '---===BUTTON:%s:%s===---' % (token.id, label)
@register.filter
def add(term1, term2):
'''replace the "add" native django filter'''
# consider None content as the empty string
if term1 is None:
term1 = ''
if term2 is None:
term2 = ''
# return available number if the other term is the empty string
if term1 == '':
try:
return parse_decimal(term2, do_raise=True)
except (ArithmeticError, TypeError):
pass
if term2 == '':
try:
return parse_decimal(term1, do_raise=True)
except (ArithmeticError, TypeError):
pass
# compute addition if both terms are numbers
try:
return parse_decimal(term1, do_raise=True) + parse_decimal(term2, do_raise=True)
except (ArithmeticError, TypeError, ValueError):
pass
# fallback to django add filter
return defaultfilters.add(unlazy(term1), unlazy(term2))
@register.filter
def subtract(term1, term2):
return parse_decimal(term1) - parse_decimal(term2)
@register.filter
def multiply(term1, term2):
return parse_decimal(term1) * parse_decimal(term2)
@register.filter
def divide(term1, term2):
try:
return parse_decimal(term1) / parse_decimal(term2)
except DecimalInvalidOperation:
return ''
except DecimalDivisionByZero:
return ''
@register.filter
def modulo(term1, term2):
try:
return parse_decimal(term1) % parse_decimal(term2)
except DecimalInvalidOperation:
return ''
except DecimalDivisionByZero:
return ''
@register.filter(name='sum')
def sum_(list_):
list_ = unlazy(list_)
if isinstance(list_, str):
# do not consider string as iterable, to avoid misusage
return ''
try:
return sum(parse_decimal(term) for term in list_)
except TypeError: # list_ is not iterable
return ''
@register.filter
def ceil(value):
'''the smallest integer value greater than or equal to value'''
return decimal(math.ceil(parse_decimal(value)))
@register.filter
def floor(value):
return decimal(math.floor(parse_decimal(value)))
@register.filter(name='abs')
def abs_(value):
return decimal(abs(parse_decimal(value)))
@register.simple_tag
def version_hash():
from wcs.qommon.admin.menu import get_vc_version
return hashlib.md5(force_bytes(get_vc_version())).hexdigest()
def generate_token(alphabet, length):
r = random.SystemRandom()
return ''.join([r.choice(alphabet) for i in range(length)])
@register.simple_tag
def token_decimal(length=6):
# entropy by default is log(10^6)/log(2) = 19.93 bits
# decimal always need more length than alphanum for the same security level
# for 128bits security level, length must be more than log(2^128)/log(10) = 38.53 digits
return generate_token(string.digits, length)
@register.simple_tag
def token_alphanum(length=4):
# use of a 28 characters alphabet using uppercase letters and digits but
# removing confusing characters and digits 0, O, 1 and I.
# entropy by default is log(28^4)/log(2) = 19.22 bits
# for 128 bits security level length must be more than log(2^128)/log(28) = 26.62 characters
return generate_token('23456789ABCDEFGHJKLMNPQRSTUVWXYZ', length)
@register.filter
def token_check(token1, token2):
return force_text(token1).strip().upper() == force_text(token2).strip().upper()
def get_latlon(obj):
if getattr(obj, 'geoloc', None):
if 'base' in obj.geoloc:
return obj.geoloc['base']['lat'], obj.geoloc['base']['lon']
return None, None
obj = unlazy(obj)
if isinstance(obj, dict) and 'lat' in obj and 'lon' in obj:
try:
return float(obj['lat']), float(obj['lon'])
except (TypeError, ValueError):
pass
if isinstance(obj, dict) and 'lat' in obj and 'lng' in obj:
try:
return float(obj['lat']), float(obj['lng'])
except (TypeError, ValueError):
pass
if isinstance(obj, str) and ';' in obj:
try:
return float(obj.split(';')[0]), float(obj.split(';')[1])
except ValueError:
pass
return None, None
@register.filter
def distance(obj1, obj2):
lat1, lon1 = get_latlon(obj1)
if lat1 is None or lon1 is None:
return None
lat2, lon2 = get_latlon(obj2)
if lat2 is None or lon2 is None:
return None
geod = Geod(ellps='WGS84')
distance = geod.inv(lon1, lat1, lon2, lat2)[2]
return distance
@register.filter
def set_geo_center(queryset, lazy_formdata):
return queryset.set_geo_center(lazy_formdata)
@register.filter
def distance_filter(queryset, distance=1000):
return queryset.distance_filter(distance=int(unlazy(distance)))
@register.filter
def same_user(queryset):
return queryset.same_user()
@register.filter
def exclude_self(queryset):
return queryset.exclude_self()
@register.filter
def current_user(queryset):
return queryset.current_user()
@register.filter
def filter_by_user(queryset, user):
return queryset.filter_by_user(unlazy(user))
@register.filter
def filter_by_status(queryset, status):
return queryset.filter_by_status(status)
@register.filter
def filter_by_internal_id(queryset, form_internal_id):
return queryset.filter_by_internal_id(unlazy(form_internal_id))
@register.filter
def filter_by_number(queryset, form_number):
return queryset.filter_by_number(form_number)
@register.filter
def pending(queryset):
return queryset.pending()
@register.filter
def done(queryset):
return queryset.done()
@register.filter
def objects(forms_source, slug):
# assume formdef_source is an instance of CardsSource of FormsSource
return getattr(forms_source, unlazy(slug)).objects
@register.filter
def with_custom_view(queryset, custom_view_slug):
return queryset.with_custom_view(custom_view_slug)
@register.filter
def order_by(queryset, attribute):
return queryset.order_by(unlazy(attribute))
@register.filter
def filter_by(queryset, attribute):
return queryset.filter_by(unlazy(attribute))
@register.filter
def filter_value(queryset, value):
return queryset.apply_filter_value(unlazy(value))
@register.filter
def exclude_value(queryset, value):
return queryset.apply_exclude_value(unlazy(value))
@register.filter
def count(queryset):
if hasattr(queryset, '__len__'):
# don't unlazy if object has native __len__ support, this is required
# for blocks as unlazying would give {'data': ..., 'schema': ...} and
# the length would always be 2.
return len(queryset)
queryset = unlazy(queryset)
if queryset is None:
return 0
return len(queryset)
@register.filter
def reproj(coords, projection_name):
proj = pyproj.Proj(init='EPSG:4326')
target_proj = pyproj.Proj(init=projection_name)
return pyproj.transform(proj, target_proj, coords['lon'], coords['lat'])
@register.filter
def has_role(user, role_name):
if not callable(getattr(user, 'get_roles', None)):
# do not fail on non-user objects, just return False
return False
for role_id in user.get_roles():
try:
if role_name == get_publisher().role_class.get(role_id).name:
return True
except KeyError: # role has been deleted
pass
return False
@register.filter
def roles(user):
if not callable(getattr(user, 'get_roles', None)):
# do not fail on non-user objects, just return empty list
return []
role_ids = user.get_roles()
roles = [get_publisher().role_class.get(x, ignore_errors=True, ignore_migration=True) for x in role_ids]
return [x.name for x in roles if x]
@register.filter
def language_detect(value):
if langdetect is None:
return ''
try:
return langdetect.detect(str(value))
except LangDetectException:
return ''
@register.filter(is_safe=False)
def phonenumber_fr(value, separator=' '):
DROMS = ('262', '508', '590', '594', '596')
value = unlazy(value)
if not value or not isinstance(value, str):
return value
number = value.strip()
if not number:
return value
if number[0] == '+':
international = '+'
number = '00' + number[1:]
else:
international = '00' + separator
number = ''.join(c for c in number if c in '0123456789')
def in_pairs(num):
return separator.join(num[i * 2 : i * 2 + 2] for i in range(len(num) // 2))
# local number
if len(number) == 10 and number[0] == '0' and number[1] in '123456789':
return in_pairs(number)
# international
if len(number) == 14 and number[0:5] == '00330':
# +/00 33 (0)x xx xx xx xx : remove (0)
number = number[0:4] + number[5:]
if len(number) == 13 and number[0:4] == '0033':
return international + '33' + separator + number[4] + separator + in_pairs(number[5:])
if len(number) == 11 and number[0:2] == '00' and number[2:5] in DROMS:
return international + number[2:5] + separator + in_pairs(number[5:])
# unknown
return value
@register.filter(is_safe=True)
def is_french_mobile_phone_number(value):
value = unlazy(value)
if not value:
return False
value = value.strip().replace(' ', '')
if not validate_phone_fr(value):
return False
return value.startswith('06') or value.startswith('07')
@register.filter
def is_empty(value):
from wcs.variables import LazyFormDefObjectsManager, LazyList
value = unlazy(value)
if isinstance(value, (str, list, dict)):
return not value
if isinstance(value, (LazyFormDefObjectsManager, LazyList)):
return not list(value)
return value is None
@register.filter
def strip_metadata(value):
return unlazy(value).strip_metadata()
@register.filter
def rename_file(value, new_name):
from wcs.fields import FileField
file_object = FileField.convert_value_from_anything(value)
if not file_object:
return None
if new_name.endswith('.$ext'):
new_name = os.path.splitext(new_name)[0] + os.path.splitext(file_object.base_filename)[1]
file_object.orig_filename = new_name
file_object.base_filename = new_name
return file_object
@register.filter
def first(value):
try:
return defaultfilters.first(value)
except TypeError:
return ''
@register.filter
def last(value):
try:
return defaultfilters.last(value)
except TypeError:
return ''
@register.filter(name='list')
def list_(value):
# turn a generator into a list
return list(unlazy(value))
@register.filter(name='qrcode')
def qrcode_filter(value, name=None):
if not qrcode:
return ''
if not isinstance(value, str):
return ''
img = qrcode.make(value)
buf = io.BytesIO()
img.save(buf)
upload = upload_storage.PicklableUpload(name or 'qrcode.png', 'image/png')
upload.receive([buf.getvalue()])
return upload
@register.simple_tag
def newline(windows=False):
return '\r\n' if windows else '\n'
@register.simple_tag(takes_context=True)
def block_value(context, append=False, merge=False, **kwargs):
# kwargs are varnames of block subfields
# * append=True will add a "row" to the block
# * merge=True will merge the value into an existing row of the block
# it can be True to alter the last row, or a row number (counting from 0).
# Both will create a row if there's no existing value.
from wcs.fields import BlockRowValue
value = BlockRowValue(append=append, merge=merge, **{k: unlazy(v) for k, v in kwargs.items()})
if context.get('allow_complex'):
return get_publisher().cache_complex_data(value, '<block value>')
return value # mostly non-useful
@register.filter
def as_template(value):
from wcs.workflows import WorkflowStatusItem
return WorkflowStatusItem.compute(unlazy(value))