wcs/wcs/variables.py

606 lines
18 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2018 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from django.utils.functional import SimpleLazyObject
from quixote import get_publisher
import qommon.misc
from qommon.evalutils import make_datetime
from qommon.templatetags.qommon import parse_datetime
class LazyFormDef(object):
def __init__(self, formdef):
self._formdef = formdef
@property
def name(self):
return self._formdef.name
@property
def slug(self):
return self._formdef.url_name
@property
def objects(self):
from wcs.formdef import FormDefSubstVar
return FormDefSubstVar(self._formdef)
@property
def option(self):
return LazyFormDefOption(self._formdef)
class LazyFormDefOption(object):
def __init__(self, formdef):
self._formdef = formdef
_options = None
@property
def options(self):
if self._options is not None:
return self._options
self._options = {}
if not self._formdef.workflow.variables_formdef:
return self._options
if not self._formdef.workflow_options:
return self._options
for field in self._formdef.workflow.variables_formdef.fields:
if not field.varname:
continue
self._options[field.varname] = self._formdef.workflow_options.get(field.varname)
if field.store_display_value:
if '%s_display' % field.varname in self._formdef.workflow_options:
self._options[field.varname + '_raw'] = self._options[field.varname]
self._options[field.varname] = self._formdef.workflow_options[
'%s_display' % field.varname]
if field.store_structured_value:
if '%s_structured' % field.varname in self._formdef.workflow_options:
self._options[field.varname + '_structured'] = self._formdef.workflow_options.get(
'%s_structured' % field.varname)
return self._options
def __getitem__(self, key):
return self.options[key]
class LazyFormData(LazyFormDef):
def __init__(self, formdata):
super(LazyFormData, self).__init__(formdata.formdef)
self._formdata = formdata
@property
def formdef(self):
return LazyFormDef(self._formdata.formdef)
@property
def internal_id(self):
return self._formdata.id
@property
def receipt_date(self):
return qommon.misc.strftime(qommon.misc.date_format(), self._formdata.receipt_time)
@property
def receipt_time(self):
return qommon.misc.strftime('%H:%M', self._formdata.receipt_time)
@property
def number(self):
return self._formdata.get_display_id()
@property
def number_raw(self):
return str(self._formdata.id) if self._formdata.id else None
@property
def url(self):
return self._formdata.get_url()
@property
def url_backoffice(self):
return self._formdata.get_url(backoffice=True)
@property
def backoffice_url(self):
return self._formdata.get_url(backoffice=True)
@property
def uri(self):
return '%s/%s/' % (self._formdef.url_name, self._formdata.id)
@property
def criticality_level(self):
return self._formdata.criticality_level
@property
def criticality_label(self):
try:
return self._formdata.get_criticality_level_object().name
except IndexError:
return None
@property
def digest(self):
return self._formdata.digest
@property
def receipt_datetime(self):
return make_datetime(self._formdata.receipt_time) if self._formdata.receipt_time else None
@property
def status(self):
return self._formdata.get_status_label()
@property
def status_is_endpoint(self):
return self._formdata.is_at_endpoint_status()
@property
def tracking_code(self):
formdata = self._formdata
if not formdata.status and formdata.data:
if 'future_tracking_code' in formdata.data:
return formdata.data['future_tracking_code']
elif 'draft_formdata_id' in formdata.data:
formdata = formdata.formdef.data_class().get(formdata.data['draft_formdata_id'])
return formdata.tracking_code
@property
def submission_backoffice(self):
return self._formdata.backoffice_submission
@property
def submission_channel(self):
return self._formdata.submission_channel
@property
def submission_channel_label(self):
return self._formdata.get_submission_channel_label()
@property
def submission_context(self):
return self._formdata.submission_context
@property
def status_url(self):
return '%sstatus' % self._formdata.get_url()
@property
def details(self):
return self._formdef.get_detailed_email_form(self._formdata, self._formdata.get_url())
@property
def user(self):
user = self._formdata.get_user()
return LazyUser(user) if user else None
@property
def var(self):
return LazyFormDataVar(self._formdef.get_all_fields(), self._formdata.data, self._formdata)
@property
def field(self):
# no lazy dictionary here as it's legacy.
d = {}
for field in self._formdef.get_all_fields():
if not hasattr(field, 'get_view_value'):
continue
value = self._formdata.data.get(field.id)
if value is not None and field.convert_value_to_str:
value = field.convert_value_to_str(value)
elif value is None:
value = ''
identifier_name = qommon.misc.simplify(field.label, space='_')
d[identifier_name] = value
return d
@property
def role(self):
from wcs.roles import Role
workflow_roles = {}
if self._formdef.workflow_roles:
workflow_roles.update(self._formdef.workflow_roles)
if self._formdata.workflow_roles:
workflow_roles.update(self._formdata.workflow_roles)
d = {}
for role_type, role_id in workflow_roles.items():
prefix = '%s_' % role_type.replace('-', '_').strip('_')
try:
d.update(Role.get(role_id).get_substitution_variables(prefix))
except KeyError:
pass
return d
@property
def comment(self):
if self._formdata.evolution and self._formdata.evolution[-1].comment:
return self._formdata.evolution[-1].comment
return ''
@property
def page_no(self):
return int(self._formdata.page_no)
@property
def attachments(self):
from wcs.workflows import AttachmentsSubstitutionProxy
return AttachmentsSubstitutionProxy(self._formdata)
@property
def geoloc(self):
data = {}
if self._formdata.geolocations:
for k, v in self._formdata.geolocations.items():
data[k] = v
data[k + '_lat'] = v.get('lat')
data[k + '_lon'] = v.get('lon')
return data
@property
def previous_status(self):
if self._formdata.evolution:
for evolution in reversed(self._formdata.evolution):
if evolution.status and evolution.status != self._formdata.status:
return self._formdata.get_status_label(evolution.status)
return ''
@property
def status_changed(self):
return self.status != self.previous_status
@property
def evolution(self):
return self._formdef.get_detailed_evolution(self._formdata)
def export_to_json(self, include_files=True):
# this gets used to generate an email attachment :/
return self._formdata.export_to_json(include_files=include_files)
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
if key.startswith('f'):
for field in self._formdef.get_all_fields():
if str(field.id) == str(key[1:]):
return self._formdata.data.get(field.id)
raise
class LazyFormDataVar(object):
def __init__(self, fields, data, formdata=None):
self._fields = fields
self._data = data or {}
self._formdata = formdata
_varnames = None
@property
def varnames(self):
if self._varnames is not None:
return self._varnames
self._varnames = {}
for field in self._fields:
if not field.varname:
continue
self._varnames[field.varname] = field
return self._varnames
def __getitem__(self, key):
try:
field = self.varnames[key]
except KeyError:
# key was unknown but for some values we may still have to provide
# multiple keys (for example file fields will expect to have both
# form_var_foo and form_var_foo_raw set to None) and user
# conditions may use the "is" operator that cannot be overridden
# (this applies to None as well as boolean values).
#
# Therefore we catch unknown keys with known suffixes ("foo_raw")
# and remove the suffix to get the actual field. If the data is
# None or a boolean type, we return it as is.
if not (key.endswith('_raw') or key.endswith('_url')):
raise
maybe_varname = key.rsplit('_', 1)[0]
field = self.varnames[maybe_varname]
if self._data.get(field.id) in (None, True, False):
# valid suffix and data of the correct type
return self._data.get(field.id)
raise KeyError(key)
if self._data.get(field.id) is None:
return None
if str(field.id) not in self._data:
raise KeyError(key)
if field.key == 'date':
return LazyFieldVarDate(self._data, field, self._formdata)
if field.key == 'map':
return LazyFieldVarMap(self._data, field, self._formdata)
return LazyFieldVar(self._data, field, self._formdata)
def __getattr__(self, attr):
try:
return self.__getitem__(attr)
except KeyError:
raise AttributeError(attr)
class LazyFieldVar(object):
def __init__(self, data, field, formdata=None):
self._data = data
self._field = field
self._formdata = formdata
@property
def raw(self):
if self._field.store_display_value or self._field.key in ('file', 'date'):
return self._data.get(self._field.id)
raise KeyError('raw')
@property
def url(self):
if self._field.key != 'file' or not self._formdata:
raise KeyError('url')
return '%sdownload?f=%s' % (self._formdata.get_url(), self._field.id)
def get_value(self):
if self._field.store_display_value:
return self._data.get('%s_display' % self._field.id)
value = self._data.get(self._field.id)
if self._field.convert_value_to_str:
return self._field.convert_value_to_str(value)
if isinstance(value, str):
return unicode(value, get_publisher().site_charset)
return value
def __str__(self):
value = self.get_value()
if isinstance(value, unicode):
return value.encode(get_publisher().site_charset)
return str(value)
def __nonzero__(self):
if self._field.key == 'bool':
return bool(self._data.get(self._field.id))
return bool(self.get_value())
def __contains__(self, value):
if self._field.key == 'items':
return qommon.misc.site_encode(value) in self._data.get(self._field.id)
return value in self.get_value()
def __unicode__(self):
return unicode(str(self), get_publisher().site_charset)
def __eq__(self, other):
return unicode(self) == unicode(other)
def __ne__(self, other):
return unicode(self) != unicode(other)
def __getitem__(self, key):
if isinstance(key, (int, slice)):
return self.get_value()[key]
try:
return getattr(self, key)
except AttributeError:
pass
structured_value = self._field.get_structured_value(self._data)
if not structured_value:
raise KeyError(key)
if isinstance(structured_value, dict):
return structured_value[key]
if isinstance(structured_value, list):
for i, struct_value in enumerate(structured_value):
if str(key) == str(i):
return struct_value
raise KeyError(key)
def __iter__(self):
return iter(self.get_value())
def __add__(self, other):
return self.get_value().__add__(other)
def __radd__(self, other):
return other + self.get_value()
def __mul__(self, other):
return self.get_value().__mul__(other)
def startswith(self, other):
return self.get_value().startswith(other)
def __getstate__(self):
raise AssertionError('lazy cannot be pickled')
class DateOperatorsMixin(object):
def __eq__(self, other):
if hasattr(other, 'timetuple'):
other = other.timetuple()
elif hasattr(other, 'get_value'):
other = other.get_value()
return parse_datetime(self.timetuple()) == parse_datetime(other)
def __ne__(self, other):
if hasattr(other, 'timetuple'):
other = other.timetuple()
elif hasattr(other, 'get_value'):
other = other.get_value()
return parse_datetime(self.timetuple()) != parse_datetime(other)
def __gt__(self, other):
if hasattr(other, 'timetuple'):
other = other.timetuple()
elif hasattr(other, 'get_value'):
other = other.get_value()
return parse_datetime(self.timetuple()) > parse_datetime(other)
def __lt__(self, other):
if hasattr(other, 'timetuple'):
other = other.timetuple()
elif hasattr(other, 'get_value'):
other = other.get_value()
return parse_datetime(self.timetuple()) < parse_datetime(other)
def __ge__(self, other):
if hasattr(other, 'timetuple'):
other = other.timetuple()
elif hasattr(other, 'get_value'):
other = other.get_value()
return parse_datetime(self.timetuple()) >= parse_datetime(other)
def __le__(self, other):
if hasattr(other, 'timetuple'):
other = other.timetuple()
elif hasattr(other, 'get_value'):
other = other.get_value()
return parse_datetime(self.timetuple()) <= parse_datetime(other)
class LazyDateObject(DateOperatorsMixin, SimpleLazyObject):
pass
def lazy_date(value):
return LazyDateObject(lambda: value)
class LazyFieldVarDate(DateOperatorsMixin, LazyFieldVar):
def get_raw(self):
return self._data.get(self._field.id)
def timetuple(self):
return self.get_raw()
# for backward compatibility with sites using time.struct_time
# methods we still have to provide time.struct_time properties.
@property
def tm_year(self):
return self.get_raw().tm_year
@property
def tm_mon(self):
return self.get_raw().tm_mon
@property
def tm_mday(self):
return self.get_raw().tm_mday
@property
def tm_hour(self):
return self.get_raw().tm_hour
@property
def tm_min(self):
return self.get_raw().tm_min
@property
def tm_sec(self):
return self.get_raw().tm_sec
@property
def tm_wday(self):
return self.get_raw().tm_wday
@property
def tm_yday(self):
return self.get_raw().tm_yday
class LazyFieldVarMap(LazyFieldVar):
def split(self, *args, **kwargs):
# Compatibility with usage of map variable as a string. It is
# recommended to use lat/lon properties instead.
return self._data.get(self._field.id).split(*args, **kwargs)
class LazyUser(object):
def __init__(self, user):
self._user = user
@property
def display_name(self):
return self._user.display_name
@property
def email(self):
return self._user.email
@property
def var(self):
return LazyFormDataVar(self._user.formdef.fields, self._user.form_data)
@property
def admin_access(self):
return self._user.can_go_in_admin()
@property
def backoffice_access(self):
return self._user.can_go_in_backoffice()
@property
def name_identifier(self):
d = {}
for i, name_identifier in enumerate(self._user.name_identifiers):
d[str(i)] = name_identifier
return d
def __getitem__(self, key):
return getattr(self, key)
def __getattr__(self, attr):
try:
return super(LazyUser, self).__getattr__(attr)
except AttributeError:
return getattr(self._user, attr)
class LazyRequest(object):
def __init__(self, request):
self._request = request
@property
def quixote_request(self): # compatibility
return self._request
@property
def GET(self):
return self._request.django_request.GET
@property
def META(self):
return self._request.django_request.META
@property
def is_in_backoffice(self):
return self._request.is_in_backoffice()
@property
def method(self):
return self._request.method
@property
def user(self):
return self._request.user