1199 lines
38 KiB
Python
1199 lines
38 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2018 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import warnings
|
|
|
|
from django.utils.encoding import force_text
|
|
from django.utils.functional import SimpleLazyObject
|
|
from pyproj import Geod
|
|
from quixote import get_publisher, get_request
|
|
|
|
from .carddef import CardDef
|
|
from .formdef import FormDef
|
|
from .qommon import _, force_str, misc
|
|
from .qommon.evalutils import make_datetime
|
|
from .qommon.storage import Equal, NotEqual, Null, Or
|
|
from .qommon.substitution import CompatibilityNamesDict
|
|
from .qommon.templatetags.qommon import parse_datetime
|
|
|
|
|
|
class LazyFormDefObjectsManager:
|
|
def __init__(
|
|
self, formdef, formdata=None, geoloc_center_formdata=None, criterias=None, order_by=None, limit=None
|
|
):
|
|
self._formdef = formdef
|
|
self._formdata = formdata
|
|
self._geoloc_center_formdata = geoloc_center_formdata
|
|
if criterias is None:
|
|
criterias = [
|
|
NotEqual('status', 'draft'),
|
|
Null('anonymised'),
|
|
]
|
|
# add custom marker to criteria so it can be found back and removed in
|
|
# drafts() or with_anonymised()
|
|
criterias[0].exclude_drafts = True
|
|
criterias[1].exclude_anonymised = True
|
|
self._criterias = criterias
|
|
self._order_by = order_by
|
|
self._limit = limit
|
|
self._cached_resultset = None
|
|
|
|
@property # @property for backward compatibility
|
|
def count(self):
|
|
if not hasattr(self, '_count_cache'):
|
|
self._count_cache = self._formdef.data_class().count(clause=self._criterias)
|
|
return self._count_cache
|
|
|
|
def _clone(self, criterias, order_by=None):
|
|
return LazyFormDefObjectsManager(
|
|
formdef=self._formdef,
|
|
formdata=self._formdata,
|
|
geoloc_center_formdata=self._geoloc_center_formdata,
|
|
criterias=criterias,
|
|
order_by=order_by or self._order_by,
|
|
limit=self._limit,
|
|
)
|
|
|
|
def order_by(self, attribute):
|
|
return self._clone(self._criterias, order_by=attribute)
|
|
|
|
def limit(self, limit):
|
|
qs = self._clone(self._criterias)
|
|
qs._limit = limit
|
|
return qs
|
|
|
|
def all(self):
|
|
# (expose 'all' only to mimick django, it's not actually useful as this
|
|
# object serves as both manager and queryset)
|
|
return self._clone(None)
|
|
|
|
def none(self):
|
|
return self._clone([Equal('status', '_none')])
|
|
|
|
def pending(self):
|
|
status_filters = ['wf-%s' % x.id for x in self._formdef.workflow.get_not_endpoint_status()]
|
|
criterias = [Or([Equal('status', x) for x in status_filters])]
|
|
return self._clone(self._criterias + criterias)
|
|
|
|
def current_user(self): # filter on current user
|
|
user = get_request().user
|
|
return self._clone(self._criterias + [Equal('user_id', str(user.id) if user else '-1')])
|
|
|
|
def filter_by_user(self, user):
|
|
return self._clone(self._criterias + [Equal('user_id', str(user.id) if user else '-1')])
|
|
|
|
def filter_by_status(self, status):
|
|
for wfs in self._formdef.workflow.possible_status:
|
|
if wfs.name == status:
|
|
wf_status = 'wf-%s' % wfs.id
|
|
return self._clone(self._criterias + [Equal('status', wf_status)])
|
|
return self.none()
|
|
|
|
def with_custom_view(self, custom_view_slug):
|
|
lookup_criterias = [
|
|
Equal('formdef_type', self._formdef.xml_root_node),
|
|
Equal('formdef_id', self._formdef.id),
|
|
NotEqual('visibility', 'owner'),
|
|
Equal('slug', custom_view_slug),
|
|
]
|
|
try:
|
|
custom_view = get_publisher().custom_view_class.select(lookup_criterias)[0]
|
|
except IndexError:
|
|
return self.none()
|
|
return self._clone(self._criterias + custom_view.get_criterias(), order_by=custom_view.order_by)
|
|
|
|
def exclude_self(self):
|
|
assert self._formdata
|
|
if not self._formdata.id:
|
|
if hasattr(self._formdata, '_edited_id'):
|
|
return self._clone(self._criterias + [NotEqual('id', str(self._formdata._edited_id))])
|
|
return self._clone(self._criterias)
|
|
return self._clone(self._criterias + [NotEqual('id', str(self._formdata.id))])
|
|
|
|
def same_user(self):
|
|
assert self._formdata
|
|
if self._formdata.user_id is None:
|
|
return self
|
|
return self._clone(self._criterias + [Equal('user_id', str(self._formdata.user.id))])
|
|
|
|
def drafts(self):
|
|
criterias = [x for x in self._criterias if not getattr(x, 'exclude_drafts', False)]
|
|
return self._clone(criterias + [Equal('status', 'draft')])
|
|
|
|
def with_anonymised(self):
|
|
criterias = [x for x in self._criterias if not getattr(x, 'exclude_anonymised', True)]
|
|
return self._clone(criterias)
|
|
|
|
def done(self):
|
|
status_filters = ['wf-%s' % x.id for x in self._formdef.workflow.get_endpoint_status()]
|
|
criterias = [Or([Equal('status', x) for x in status_filters])]
|
|
return self._clone(self._criterias + criterias)
|
|
|
|
def set_geo_center(self, lazy_formdata):
|
|
qs = self._clone(self._criterias)
|
|
qs._geoloc_center_formdata = lazy_formdata._formdata
|
|
return qs
|
|
|
|
def distance_filter(self, distance):
|
|
geod = Geod(ellps='WGS84')
|
|
center = (self._geoloc_center_formdata or self._formdata).get_auto_geoloc()
|
|
center_lon, center_lat = center['lon'], center['lat']
|
|
|
|
def distance_check(obj):
|
|
geoloc = obj.get_auto_geoloc()
|
|
if not geoloc:
|
|
return False
|
|
# keep computed distance in object
|
|
obj._distance = geod.inv(center_lon, center_lat, geoloc['lon'], geoloc['lat'])[2]
|
|
return bool(obj._distance < distance)
|
|
|
|
return self._clone(self._criterias + [distance_check])
|
|
|
|
def filter_by(self, attribute):
|
|
qs = self._clone(self._criterias)
|
|
qs.pending_attr = attribute
|
|
return qs
|
|
|
|
def filter_by_number(self, value):
|
|
return self._clone(self._criterias + [Equal('id_display', str(value))])
|
|
|
|
def get_field(self, key):
|
|
for field in self._formdef.get_all_fields():
|
|
if getattr(field, 'varname', None) == key:
|
|
return field
|
|
|
|
def apply_filter_value(self, value):
|
|
assert self.pending_attr
|
|
|
|
field = self.get_field(self.pending_attr)
|
|
if field is None:
|
|
get_publisher().record_error(
|
|
_('Invalid filter "%s"') % self.pending_attr, formdata=self._formdata
|
|
)
|
|
return self.none()
|
|
|
|
if field.convert_value_from_anything:
|
|
try:
|
|
value = field.convert_value_from_anything(value)
|
|
except (ValueError, AttributeError):
|
|
get_publisher().record_error(
|
|
_('Invalid value "%s" for filter "%s"') % (value, self.pending_attr),
|
|
formdata=self._formdata,
|
|
)
|
|
return self.none()
|
|
|
|
from wcs import sql
|
|
|
|
criteria = Equal(sql.get_field_id(field), value)
|
|
return self._clone(self._criterias + [criteria])
|
|
|
|
def getlist(self, key):
|
|
return LazyList(self, key)
|
|
|
|
def _populate_cache(self):
|
|
if self._cached_resultset is not None:
|
|
return
|
|
result = self._formdef.data_class().select(
|
|
clause=self._criterias, order_by=self._order_by, limit=self._limit
|
|
)
|
|
self._cached_resultset = [LazyFormData(x) for x in result]
|
|
|
|
def __getattr__(self, attribute):
|
|
if attribute.startswith('count_status_'):
|
|
# backward compatibility
|
|
status = attribute[len('count_status_') :]
|
|
return len(self._formdef.data_class().get_ids_with_indexed_value('status', 'wf-%s' % status))
|
|
if attribute.startswith('filter_by_'):
|
|
attribute_name = attribute[len('filter_by_') :]
|
|
return lambda: self.filter_by(attribute_name)
|
|
if attribute == 'formdef':
|
|
warnings.warn('Deprecated access to formdef', DeprecationWarning)
|
|
return self._formdef
|
|
raise AttributeError('No such attribute %r' % attribute)
|
|
|
|
def __len__(self):
|
|
if self._cached_resultset is not None:
|
|
return len(self._cached_resultset)
|
|
return self.count
|
|
|
|
def __getitem__(self, key):
|
|
try:
|
|
if not isinstance(key, slice):
|
|
int(key)
|
|
except ValueError:
|
|
# A django template doing formdef.objects.drafts would start by
|
|
# doing ['drafts'], that would raise TypeError and then continue
|
|
# to accessing .drafts (this is done in _resolve_lookup).
|
|
# We need to abort earlier as we don't want to load all formdata
|
|
# in that situation.
|
|
raise TypeError
|
|
self._populate_cache()
|
|
return self._cached_resultset[key]
|
|
|
|
def __iter__(self):
|
|
self._populate_cache()
|
|
for lazy_formdata in self._cached_resultset:
|
|
yield lazy_formdata
|
|
|
|
def __nonzero__(self):
|
|
return any(self)
|
|
|
|
|
|
class LazyList:
|
|
def __init__(self, lazy_manager, key):
|
|
self._lazy_manager = lazy_manager
|
|
self._key = key
|
|
self._cached_resultset = None
|
|
|
|
def _populate_cache(self):
|
|
if self._cached_resultset is not None:
|
|
return
|
|
self._cached_resultset = []
|
|
for lazy_formdata in self._lazy_manager:
|
|
value = lazy_formdata.get(self._key)
|
|
if value is not None:
|
|
if hasattr(value, 'timetuple'):
|
|
value = value.timetuple()
|
|
else:
|
|
value = value.get_value()
|
|
self._cached_resultset.append(value)
|
|
|
|
def __len__(self):
|
|
if self._cached_resultset is not None:
|
|
return len(self._cached_resultset)
|
|
return len(self._lazy_manager)
|
|
|
|
def __iter__(self):
|
|
self._populate_cache()
|
|
for value in self._cached_resultset:
|
|
yield value
|
|
|
|
def __nonzero__(self):
|
|
return any(self)
|
|
|
|
def __contains__(self, value):
|
|
if self._cached_resultset is not None:
|
|
field = self._lazy_manager.get_field(self._key)
|
|
if field is not None:
|
|
try:
|
|
value = field.convert_value_from_anything(value)
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
return value in list(self)
|
|
|
|
queryset = list(self._lazy_manager.filter_by(self._key).apply_filter_value(value).limit(1))
|
|
return len(queryset) == 1
|
|
|
|
def __eq__(self, other):
|
|
return list(self) == list(other)
|
|
|
|
|
|
class LazyFormDef:
|
|
def __init__(self, formdef):
|
|
self._formdef = formdef
|
|
|
|
@property
|
|
def name(self):
|
|
return self._formdef.name
|
|
|
|
@property
|
|
def slug(self):
|
|
return self._formdef.url_name
|
|
|
|
@property
|
|
def objects(self):
|
|
return LazyFormDefObjectsManager(self._formdef)
|
|
|
|
@property
|
|
def option(self):
|
|
if not self._formdef.workflow.variables_formdef:
|
|
return {}
|
|
return LazyFormDefOptions(self._formdef)
|
|
|
|
|
|
class LazyFormData(LazyFormDef):
|
|
def __init__(self, formdata):
|
|
super().__init__(formdata.formdef)
|
|
self._formdata = formdata
|
|
|
|
def inspect_keys(self):
|
|
hidden_keys = ('field', 'inspect_keys', 'page_no', 'formdef', 'objects')
|
|
for key in dir(self):
|
|
if key[0] == '_' or key in hidden_keys:
|
|
continue
|
|
if key == 'parent':
|
|
if self.parent: # hide parent when it's None
|
|
yield key, False # = do not recurse
|
|
else:
|
|
yield key
|
|
|
|
@property
|
|
def objects(self):
|
|
return LazyFormDefObjectsManager(self._formdef, formdata=self._formdata)
|
|
|
|
@property
|
|
def formdef(self):
|
|
return LazyFormDef(self._formdata.formdef)
|
|
|
|
@property
|
|
def internal_id(self):
|
|
return self._formdata.id
|
|
|
|
@property
|
|
def receipt_date(self):
|
|
return misc.strftime(misc.date_format(), self._formdata.receipt_time)
|
|
|
|
@property
|
|
def receipt_time(self):
|
|
return misc.strftime('%H:%M', self._formdata.receipt_time)
|
|
|
|
@property
|
|
def number(self):
|
|
return self._formdata.get_display_id()
|
|
|
|
@property
|
|
def number_raw(self):
|
|
return str(self._formdata.id) if self._formdata.id else None
|
|
|
|
@property
|
|
def url(self):
|
|
return self._formdata.get_url()
|
|
|
|
@property
|
|
def url_backoffice(self):
|
|
return self._formdata.get_url(backoffice=True)
|
|
|
|
@property
|
|
def backoffice_url(self):
|
|
return self._formdata.get_url(backoffice=True)
|
|
|
|
@property
|
|
def api_url(self):
|
|
return self._formdata.get_api_url()
|
|
|
|
@property
|
|
def uri(self):
|
|
return '%s/%s/' % (self._formdef.url_name, self._formdata.id)
|
|
|
|
@property
|
|
def criticality_level(self):
|
|
return self._formdata.criticality_level
|
|
|
|
@property
|
|
def criticality_label(self):
|
|
try:
|
|
return self._formdata.get_criticality_level_object().name
|
|
except IndexError:
|
|
return None
|
|
|
|
@property
|
|
def digest(self):
|
|
return self._formdata.digest
|
|
|
|
@property
|
|
def display_name(self):
|
|
return self._formdata.get_display_name()
|
|
|
|
@property
|
|
def receipt_datetime(self):
|
|
return make_datetime(self._formdata.receipt_time) if self._formdata.receipt_time else None
|
|
|
|
@property
|
|
def last_update_datetime(self):
|
|
last_update_time = self._formdata.last_update_time
|
|
return make_datetime(last_update_time) if last_update_time else None
|
|
|
|
@property
|
|
def status(self):
|
|
return self._formdata.get_status_label()
|
|
|
|
@property
|
|
def status_is_endpoint(self):
|
|
return self._formdata.is_at_endpoint_status()
|
|
|
|
@property
|
|
def tracking_code(self):
|
|
formdata = self._formdata
|
|
if not formdata.status and formdata.data:
|
|
if 'future_tracking_code' in formdata.data:
|
|
return formdata.data['future_tracking_code']
|
|
elif 'draft_formdata_id' in formdata.data:
|
|
formdata = formdata.formdef.data_class().get(formdata.data['draft_formdata_id'])
|
|
return formdata.tracking_code
|
|
|
|
@property
|
|
def submission_backoffice(self):
|
|
return self._formdata.backoffice_submission
|
|
|
|
@property
|
|
def submission_channel(self):
|
|
return self._formdata.submission_channel
|
|
|
|
@property
|
|
def submission_channel_label(self):
|
|
return self._formdata.get_submission_channel_label()
|
|
|
|
@property
|
|
def submission_agent(self):
|
|
try:
|
|
return LazyUser(get_publisher().user_class.get(self._formdata.submission_agent_id))
|
|
except (TypeError, KeyError):
|
|
return None
|
|
|
|
@property
|
|
def submission_context(self):
|
|
return self._formdata.submission_context
|
|
|
|
@property
|
|
def status_url(self):
|
|
return '%sstatus' % self._formdata.get_url()
|
|
|
|
@property
|
|
def details(self):
|
|
return self._formdef.get_detailed_email_form(self._formdata, self._formdata.get_url())
|
|
|
|
@property
|
|
def user(self):
|
|
user = self._formdata.get_user()
|
|
return LazyUser(user) if user else None
|
|
|
|
@property
|
|
def var(self):
|
|
return LazyFormDataVar(self._formdef.get_all_fields(), self._formdata.data, self._formdata)
|
|
|
|
@property
|
|
def field(self):
|
|
# no lazy dictionary here as it's legacy.
|
|
d = {}
|
|
for field in self._formdef.get_all_fields():
|
|
if not hasattr(field, 'get_view_value'):
|
|
continue
|
|
value = self._formdata.data.get(field.id)
|
|
if value is not None and field.convert_value_to_str:
|
|
value = field.convert_value_to_str(value)
|
|
elif value is None:
|
|
value = ''
|
|
identifier_name = misc.simplify(field.label, space='_')
|
|
d[identifier_name] = value
|
|
|
|
return d
|
|
|
|
@property
|
|
def role(self):
|
|
workflow_roles = {}
|
|
if self._formdef.workflow_roles:
|
|
workflow_roles.update(self._formdef.workflow_roles)
|
|
if self._formdata.workflow_roles:
|
|
workflow_roles.update(self._formdata.workflow_roles)
|
|
|
|
d = {}
|
|
for role_type, role_id in workflow_roles.items():
|
|
prefix = '%s_' % role_type.replace('-', '_').strip('_')
|
|
try:
|
|
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
|
|
except KeyError:
|
|
pass
|
|
return d
|
|
|
|
@property
|
|
def comment(self):
|
|
if self._formdata.evolution and self._formdata.evolution[-1].comment:
|
|
return self._formdata.evolution[-1].comment
|
|
return ''
|
|
|
|
@property
|
|
def page_no(self):
|
|
return int(self._formdata.page_no)
|
|
|
|
@property
|
|
def attachments(self):
|
|
from wcs.workflows import AttachmentsSubstitutionProxy
|
|
|
|
return AttachmentsSubstitutionProxy(self._formdata)
|
|
|
|
@property
|
|
def geoloc(self):
|
|
data = {}
|
|
if self._formdata.geolocations:
|
|
for k, v in self._formdata.geolocations.items():
|
|
data[k] = v
|
|
data[k + '_lat'] = v.get('lat')
|
|
data[k + '_lon'] = v.get('lon')
|
|
return data
|
|
|
|
@property
|
|
def distance(self):
|
|
return getattr(self._formdata, '_distance', None)
|
|
|
|
@property
|
|
def previous_status(self):
|
|
if self._formdata.evolution:
|
|
for evolution in reversed(self._formdata.evolution):
|
|
if evolution.status and evolution.status != self._formdata.status:
|
|
return self._formdata.get_status_label(evolution.status)
|
|
return ''
|
|
|
|
@property
|
|
def status_changed(self):
|
|
first_evolution_in_current_status = None
|
|
for evolution in reversed(self._formdata.evolution or []):
|
|
if evolution.status and evolution.status != self._formdata.status:
|
|
break
|
|
if evolution.status:
|
|
first_evolution_in_current_status = evolution
|
|
|
|
return bool(
|
|
self.status != self.previous_status
|
|
and self._formdata.evolution
|
|
and self._formdata.evolution[-1].status
|
|
and first_evolution_in_current_status is self._formdata.evolution[-1]
|
|
and not self._formdata.evolution[-1].last_jump_datetime
|
|
)
|
|
|
|
@property
|
|
def evolution(self):
|
|
return self._formdef.get_detailed_evolution(self._formdata)
|
|
|
|
@property
|
|
def links(self):
|
|
from .wf.create_formdata import LazyFormDataLinks
|
|
|
|
return LazyFormDataLinks(self._formdata)
|
|
|
|
@property
|
|
def parent(self):
|
|
formdata = self._formdata.get_parent()
|
|
if formdata is None:
|
|
return None
|
|
return formdata.get_substitution_variables()
|
|
|
|
@property
|
|
def workflow_data(self):
|
|
data = self._formdata.workflow_data or {}
|
|
return {k: data[k] for k in data if not k.startswith('_')}
|
|
|
|
def export_to_json(self, include_files=True):
|
|
# this gets used to generate an email attachment :/
|
|
return self._formdata.export_to_json(include_files=include_files)
|
|
|
|
def get(self, key):
|
|
# compatibility with |get filter, to return a field by varname
|
|
try:
|
|
return getattr(self.var, key)
|
|
except AttributeError:
|
|
# fallback to CompatibilityNamesDict, this allows filters to do
|
|
# queryset|first|get:"form_var_plop"
|
|
compat_dict = CompatibilityNamesDict({'form': self})
|
|
return compat_dict.get(key)
|
|
|
|
def __getitem__(self, key):
|
|
try:
|
|
return getattr(self, key)
|
|
except AttributeError:
|
|
if key.startswith('f'):
|
|
for field in self._formdef.get_all_fields():
|
|
if str(field.id) == str(key[1:]):
|
|
return self._formdata.data.get(field.id)
|
|
raise
|
|
|
|
|
|
class LazyFormDataVar:
|
|
def __init__(self, fields, data, formdata=None):
|
|
self._fields = fields
|
|
self._data = data or {}
|
|
self._formdata = formdata
|
|
|
|
def inspect_keys(self):
|
|
return self.varnames.keys()
|
|
|
|
_varnames = None
|
|
|
|
@property
|
|
def varnames(self):
|
|
if self._varnames is not None:
|
|
return self._varnames
|
|
self._varnames = {}
|
|
for field in self._fields:
|
|
if not field.varname:
|
|
continue
|
|
if field.varname in self._varnames:
|
|
# duplicated varname
|
|
value = self._data.get(self._varnames[field.varname].id)
|
|
if value or value is False:
|
|
# previous field had a value (not None or the empty string),
|
|
# stay on it.
|
|
continue
|
|
# else continue and update _varnames with new field reference.
|
|
self._varnames[field.varname] = field
|
|
return self._varnames
|
|
|
|
def get_field_kwargs(self, field):
|
|
return {'data': self._data, 'field': field, 'formdata': self._formdata}
|
|
|
|
def __getitem__(self, key):
|
|
try:
|
|
field = self.varnames[key]
|
|
except KeyError:
|
|
# key was unknown but for some values we may still have to provide
|
|
# multiple keys (for example file fields will expect to have both
|
|
# form_var_foo and form_var_foo_raw set to None) and user
|
|
# conditions may use the "is" operator that cannot be overridden
|
|
# (this applies to None as well as boolean values).
|
|
#
|
|
# Therefore we catch unknown keys with known suffixes ("foo_raw")
|
|
# and remove the suffix to get the actual field. If the data is
|
|
# None or a boolean type, we return it as is.
|
|
if not (key.endswith('_raw') or key.endswith('_url')):
|
|
raise
|
|
maybe_varname = key.rsplit('_', 1)[0]
|
|
field = self.varnames[maybe_varname]
|
|
if key.endswith('_raw') and field.type == 'bool':
|
|
# turn None into False so boolean fields are always a boolean.
|
|
return bool(self._data.get(field.id))
|
|
|
|
if self._data.get(field.id) in (None, True, False):
|
|
# valid suffix and data of the correct type
|
|
return self._data.get(field.id)
|
|
raise KeyError(key)
|
|
|
|
# let boolean pass through, to get None handled as False
|
|
if field.type != 'bool':
|
|
if self._data.get(field.id) is None:
|
|
return None
|
|
|
|
if str(field.id) not in self._data:
|
|
raise KeyError(key)
|
|
|
|
klass = LazyFieldVar
|
|
if field.store_structured_value:
|
|
klass = LazyFieldVarStructured
|
|
klass = { # custom types
|
|
'date': LazyFieldVarDate,
|
|
'map': LazyFieldVarMap,
|
|
'password': LazyFieldVarPassword,
|
|
'file': LazyFieldVarFile,
|
|
'block': LazyFieldVarBlock,
|
|
'bool': LazyFieldVarBool,
|
|
}.get(field.key, klass)
|
|
|
|
return klass(**self.get_field_kwargs(field))
|
|
|
|
def __getattr__(self, attr):
|
|
try:
|
|
return self.__getitem__(attr)
|
|
except KeyError:
|
|
raise AttributeError(attr)
|
|
|
|
|
|
class LazyFieldVar:
|
|
def __init__(self, data, field, formdata=None, **kwargs):
|
|
self._data = data
|
|
self._field = field
|
|
self._formdata = formdata
|
|
self._field_kwargs = kwargs
|
|
|
|
@property
|
|
def raw(self):
|
|
if self._field.store_display_value or self._field.key in ('file', 'date'):
|
|
return self._data.get(self._field.id)
|
|
raise AttributeError('raw')
|
|
|
|
def get_value(self):
|
|
if self._field.store_display_value:
|
|
return self._data.get('%s_display' % self._field.id)
|
|
value = self._data.get(self._field.id)
|
|
if self._field.convert_value_to_str:
|
|
return self._field.convert_value_to_str(value)
|
|
if isinstance(value, str):
|
|
return force_text(value, get_publisher().site_charset)
|
|
return value
|
|
|
|
def __str__(self):
|
|
return force_str(self.get_value())
|
|
|
|
def __nonzero__(self):
|
|
if self._field.key == 'bool':
|
|
return bool(self._data.get(self._field.id))
|
|
return bool(self.get_value())
|
|
|
|
__bool__ = __nonzero__
|
|
|
|
def __contains__(self, value):
|
|
if self._field.key == 'items':
|
|
return misc.site_encode(value) in self._data.get(self._field.id)
|
|
return value in self.get_value()
|
|
|
|
def __unicode__(self):
|
|
return force_text(str(self), get_publisher().site_charset)
|
|
|
|
def __eq__(self, other):
|
|
return force_text(self) == force_text(other)
|
|
|
|
def __ne__(self, other):
|
|
return force_text(self) != force_text(other)
|
|
|
|
def __getitem__(self, key):
|
|
if isinstance(key, (int, slice)):
|
|
return self.get_value()[key]
|
|
try:
|
|
return getattr(self, key)
|
|
except AttributeError:
|
|
pass
|
|
raise KeyError(key)
|
|
|
|
def __iter__(self):
|
|
return iter(self.get_value())
|
|
|
|
def __add__(self, other):
|
|
return self.get_value().__add__(other)
|
|
|
|
def __radd__(self, other):
|
|
return other + self.get_value()
|
|
|
|
def __mul__(self, other):
|
|
return self.get_value().__mul__(other)
|
|
|
|
def __len__(self):
|
|
return len(self.get_value())
|
|
|
|
def __int__(self):
|
|
return int(self.get_value())
|
|
|
|
def startswith(self, other):
|
|
return self.get_value().startswith(other)
|
|
|
|
def strip(self, *args):
|
|
warnings.warn('Deprecated use of .strip method', DeprecationWarning)
|
|
return self.get_value().strip(*args)
|
|
|
|
def __getstate__(self):
|
|
raise AssertionError('lazy cannot be pickled')
|
|
|
|
|
|
class LazyFieldVarStructured(LazyFieldVar):
|
|
def inspect_keys(self):
|
|
structured_value = self._field.get_structured_value(self._data)
|
|
if not structured_value:
|
|
if not self._data.get(self._field.id):
|
|
return []
|
|
return ['raw']
|
|
|
|
keys = ['raw', 'structured']
|
|
if self._field.data_source and self._field.data_source.get('type', '').startswith('carddef:'):
|
|
try:
|
|
self.live
|
|
except AttributeError:
|
|
# don't advertise "live" if linked data is missing
|
|
pass
|
|
else:
|
|
keys.append('live')
|
|
|
|
def walk(base, value):
|
|
if isinstance(value, dict):
|
|
for k, v in value.items():
|
|
if CompatibilityNamesDict.valid_key_regex.match(k):
|
|
walk(k if not base else base + '_' + k, v)
|
|
else:
|
|
keys.append(base)
|
|
|
|
if isinstance(structured_value, list):
|
|
for i, value in enumerate(structured_value):
|
|
walk(str(i), value)
|
|
else:
|
|
walk('', structured_value)
|
|
|
|
return keys
|
|
|
|
@property
|
|
def structured_raw(self):
|
|
# backward compatibility, _structured should be use.
|
|
return self._field.get_structured_value(self._data)
|
|
|
|
@property
|
|
def structured(self):
|
|
return self._field.get_structured_value(self._data)
|
|
|
|
@property
|
|
def live(self):
|
|
if not (self._field.data_source and self._field.data_source.get('type', '').startswith('carddef:')):
|
|
raise AttributeError('live')
|
|
request = get_request()
|
|
card_id = self._data.get(self._field.id)
|
|
if request:
|
|
# cache during request
|
|
cache_key = '%s-%s' % (self._field.data_source['type'], card_id)
|
|
if not hasattr(request, 'live_card_cache'):
|
|
request.live_card_cache = {}
|
|
else:
|
|
carddata = request.live_card_cache.get(cache_key)
|
|
if carddata is not None:
|
|
# cached data
|
|
if isinstance(carddata, Exception):
|
|
raise carddata
|
|
return LazyFormData(carddata)
|
|
from wcs.carddef import CardDef
|
|
|
|
try:
|
|
carddef = CardDef.get_by_urlname(self._field.data_source['type'].split(':')[1])
|
|
carddata = carddef.data_class().get(card_id)
|
|
if request:
|
|
request.live_card_cache[cache_key] = carddata
|
|
except KeyError:
|
|
if request:
|
|
request.live_card_cache[cache_key] = AttributeError('live')
|
|
raise AttributeError('live')
|
|
return LazyFormData(carddata)
|
|
|
|
def __getitem__(self, key):
|
|
try:
|
|
return super().__getitem__(key)
|
|
except KeyError:
|
|
pass
|
|
structured_value = self._field.get_structured_value(self._data)
|
|
if not structured_value:
|
|
raise KeyError(key)
|
|
if isinstance(structured_value, dict):
|
|
return structured_value[key]
|
|
if isinstance(structured_value, list):
|
|
for i, struct_value in enumerate(structured_value):
|
|
if str(key) == str(i):
|
|
return struct_value
|
|
raise KeyError(key)
|
|
|
|
|
|
class DateOperatorsMixin:
|
|
def __eq__(self, other):
|
|
if hasattr(other, 'timetuple'):
|
|
other = other.timetuple()
|
|
elif hasattr(other, 'get_value'):
|
|
other = other.get_value()
|
|
return parse_datetime(self.timetuple()) == parse_datetime(other)
|
|
|
|
def __ne__(self, other):
|
|
if hasattr(other, 'timetuple'):
|
|
other = other.timetuple()
|
|
elif hasattr(other, 'get_value'):
|
|
other = other.get_value()
|
|
return parse_datetime(self.timetuple()) != parse_datetime(other)
|
|
|
|
def __gt__(self, other):
|
|
if hasattr(other, 'timetuple'):
|
|
other = other.timetuple()
|
|
elif hasattr(other, 'get_value'):
|
|
other = other.get_value()
|
|
return parse_datetime(self.timetuple()) > parse_datetime(other)
|
|
|
|
def __lt__(self, other):
|
|
if hasattr(other, 'timetuple'):
|
|
other = other.timetuple()
|
|
elif hasattr(other, 'get_value'):
|
|
other = other.get_value()
|
|
return parse_datetime(self.timetuple()) < parse_datetime(other)
|
|
|
|
def __ge__(self, other):
|
|
if hasattr(other, 'timetuple'):
|
|
other = other.timetuple()
|
|
elif hasattr(other, 'get_value'):
|
|
other = other.get_value()
|
|
return parse_datetime(self.timetuple()) >= parse_datetime(other)
|
|
|
|
def __le__(self, other):
|
|
if hasattr(other, 'timetuple'):
|
|
other = other.timetuple()
|
|
elif hasattr(other, 'get_value'):
|
|
other = other.get_value()
|
|
return parse_datetime(self.timetuple()) <= parse_datetime(other)
|
|
|
|
|
|
class LazyDateObject(DateOperatorsMixin, SimpleLazyObject):
|
|
pass
|
|
|
|
|
|
def lazy_date(value):
|
|
return LazyDateObject(lambda: value)
|
|
|
|
|
|
class LazyFieldVarDate(DateOperatorsMixin, LazyFieldVar):
|
|
def inspect_keys(self):
|
|
return ['year', 'month', 'day']
|
|
|
|
def get_raw(self):
|
|
return self._data.get(self._field.id)
|
|
|
|
def timetuple(self):
|
|
return self.get_raw()
|
|
|
|
# for backward compatibility with sites using time.struct_time
|
|
# methods we still have to provide time.struct_time properties.
|
|
@property
|
|
def tm_year(self):
|
|
return self.get_raw().tm_year
|
|
|
|
@property
|
|
def tm_mon(self):
|
|
return self.get_raw().tm_mon
|
|
|
|
@property
|
|
def tm_mday(self):
|
|
return self.get_raw().tm_mday
|
|
|
|
@property
|
|
def tm_hour(self):
|
|
return self.get_raw().tm_hour
|
|
|
|
@property
|
|
def tm_min(self):
|
|
return self.get_raw().tm_min
|
|
|
|
@property
|
|
def tm_sec(self):
|
|
return self.get_raw().tm_sec
|
|
|
|
@property
|
|
def tm_wday(self):
|
|
return self.get_raw().tm_wday
|
|
|
|
@property
|
|
def tm_yday(self):
|
|
return self.get_raw().tm_yday
|
|
|
|
year = tm_year
|
|
month = tm_mon
|
|
day = tm_mday
|
|
|
|
|
|
class LazyFieldVarMap(LazyFieldVarStructured):
|
|
def split(self, *args, **kwargs):
|
|
# Compatibility with usage of map variable as a string. It is
|
|
# recommended to use lat/lon properties instead.
|
|
return self._data.get(self._field.id).split(*args, **kwargs)
|
|
|
|
def inspect_keys(self):
|
|
return ['lat', 'lon']
|
|
|
|
|
|
class LazyFieldVarBool(LazyFieldVar):
|
|
def get_value(self):
|
|
return bool(self._data.get(self._field.id))
|
|
|
|
@property
|
|
def raw(self):
|
|
return self.get_value()
|
|
|
|
|
|
class LazyFieldVarPassword(LazyFieldVar):
|
|
def __getitem__(self, key):
|
|
# get subpart (cleartext, md5, sha1) if it exists
|
|
field_value = self._data.get(self._field.id)
|
|
if key in field_value:
|
|
return field_value[key]
|
|
return super()._getitem__(key)
|
|
|
|
|
|
class LazyFieldVarFile(LazyFieldVar):
|
|
@property
|
|
def url(self):
|
|
return self._field.get_download_url(formdata=self._formdata, **self._field_kwargs)
|
|
|
|
|
|
class LazyBlockDataVar(LazyFormDataVar):
|
|
def __init__(self, fields, data, formdata=None, parent_field=None, parent_field_index=0):
|
|
super().__init__(fields, data, formdata=formdata)
|
|
self.parent_field = parent_field
|
|
self.parent_field_index = parent_field_index
|
|
|
|
def get_field_kwargs(self, field):
|
|
kwargs = super().get_field_kwargs(field)
|
|
kwargs['parent_field'] = self.parent_field
|
|
kwargs['parent_field_index'] = self.parent_field_index
|
|
return kwargs
|
|
|
|
|
|
class LazyFieldVarBlock(LazyFieldVar):
|
|
def inspect_keys(self):
|
|
if self._field.max_items > 1:
|
|
data = self._formdata.data.get(self._field.id)['data']
|
|
return [str(x) for x in range(len(data))]
|
|
else:
|
|
return ['var']
|
|
|
|
def get_value(self):
|
|
# don't give access to underlying data dictionary.
|
|
return self._data.get(str(self._field.id))
|
|
|
|
def __str__(self):
|
|
return self._data.get('%s_display' % self._field.id, '---')
|
|
|
|
def __getitem__(self, key):
|
|
try:
|
|
int(key)
|
|
except ValueError:
|
|
return super().__getitem__(key)
|
|
data = self._formdata.data.get(self._field.id)['data'][int(key)]
|
|
return LazyBlockDataVar(
|
|
self._field.block.fields,
|
|
data,
|
|
formdata=self._formdata,
|
|
parent_field=self._field,
|
|
parent_field_index=int(key),
|
|
)
|
|
|
|
def __len__(self):
|
|
data = self._formdata.data.get(self._field.id)['data']
|
|
return len(data)
|
|
|
|
@property
|
|
def var(self):
|
|
# alias when there's a single item
|
|
return self[0]
|
|
|
|
def __iter__(self):
|
|
data = self._formdata.data.get(self._field.id)['data']
|
|
for i in range(len(data)):
|
|
yield self[i]
|
|
|
|
def getlist(self, key):
|
|
# called by |getlist filter
|
|
for field in self._field.block.fields:
|
|
if field.varname == key:
|
|
break
|
|
else:
|
|
raise AttributeError('No such attribute %r' % key)
|
|
return [data.get(field.id) for data in self._formdata.data.get(self._field.id)['data']]
|
|
|
|
|
|
class LazyUser:
|
|
def __init__(self, user):
|
|
self._user = user
|
|
|
|
def inspect_keys(self):
|
|
return ['display_name', 'email', 'var', 'nameid']
|
|
|
|
@property
|
|
def display_name(self):
|
|
return self._user.display_name
|
|
|
|
@property
|
|
def email(self):
|
|
return self._user.email
|
|
|
|
@property
|
|
def var(self):
|
|
return LazyFormDataVar(self._user.get_formdef().fields, self._user.form_data)
|
|
|
|
@property
|
|
def admin_access(self):
|
|
return self._user.can_go_in_admin()
|
|
|
|
@property
|
|
def backoffice_access(self):
|
|
return self._user.can_go_in_backoffice()
|
|
|
|
@property
|
|
def name_identifier(self):
|
|
d = {}
|
|
for i, name_identifier in enumerate(self._user.name_identifiers):
|
|
d[str(i)] = name_identifier
|
|
return d
|
|
|
|
@property
|
|
def nameid(self):
|
|
return self._user.nameid
|
|
|
|
def __getitem__(self, key):
|
|
return getattr(self, key)
|
|
|
|
def __getattr__(self, attr):
|
|
try:
|
|
return super().__getattr__(attr)
|
|
except AttributeError:
|
|
return getattr(self._user, attr)
|
|
|
|
|
|
class LazyRequest:
|
|
def __init__(self, request):
|
|
self._request = request
|
|
|
|
@property
|
|
def quixote_request(self): # compatibility
|
|
return self._request
|
|
|
|
@property
|
|
def GET(self):
|
|
return self._request.django_request.GET
|
|
|
|
@property
|
|
def META(self):
|
|
return self._request.django_request.META
|
|
|
|
@property
|
|
def is_in_backoffice(self):
|
|
return self._request.is_in_backoffice()
|
|
|
|
@property
|
|
def method(self):
|
|
return self._request.method
|
|
|
|
@property
|
|
def user(self):
|
|
return self._request.user
|
|
|
|
@property
|
|
def view_name(self):
|
|
return getattr(self._request, 'view_name', None)
|
|
|
|
|
|
class LazyFormDefOptions(LazyFormDataVar):
|
|
def __init__(self, formdef):
|
|
self._formdef = formdef
|
|
fields = self._formdef.workflow.variables_formdef.fields
|
|
for field in fields:
|
|
# change field IDs as options are stored in data with their
|
|
# varnames, not id.
|
|
field.id = field.varname or field.id
|
|
data = self._formdef.workflow_options
|
|
super().__init__(fields, data)
|
|
|
|
|
|
class CardsSource:
|
|
@classmethod
|
|
def get_substitution_variables(cls):
|
|
return {'cards': cls()}
|
|
|
|
def inspect_keys(self):
|
|
return []
|
|
|
|
def __getattr__(self, attr):
|
|
try:
|
|
return LazyFormDef(CardDef.get_by_urlname(attr))
|
|
except KeyError:
|
|
raise AttributeError(attr)
|
|
|
|
|
|
class FormsSource:
|
|
@classmethod
|
|
def get_substitution_variables(cls):
|
|
return {'forms': cls()}
|
|
|
|
def inspect_keys(self):
|
|
return []
|
|
|
|
def __getattr__(self, attr):
|
|
try:
|
|
return LazyFormDef(FormDef.get_by_urlname(attr))
|
|
except KeyError:
|
|
raise AttributeError(attr)
|