# w.c.s. - web application for online forms # Copyright (C) 2005-2018 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import json import warnings from django.utils import formats from django.utils.encoding import force_str from django.utils.functional import SimpleLazyObject from quixote import get_publisher, get_request, get_session from quixote.errors import RequestError from wcs.sql_criterias import ( And, Between, Contains, Distance, Equal, Greater, GreaterOrEqual, ILike, Less, LessOrEqual, Not, NotEqual, Nothing, NotNull, Null, Or, StrictNotEqual, ) from .carddef import CardDef, CardDefDoesNotExist from .formdata import get_workflow_roles_substitution_variables from .formdef import FormDef, FormDefDoesNotExist from .qommon import _, misc from .qommon.evalutils import make_datetime from .qommon.substitution import CompatibilityNamesDict from .qommon.templatetags.qommon import parse_datetime class LazyFormDefObjectsManager: def __init__( self, formdef, formdata=None, geoloc_center_formdata=None, criterias=None, order_by=None, limit=None, report_error_type=None, ): self._formdef = formdef self._formdata = formdata self._geoloc_center_formdata = geoloc_center_formdata if criterias is None: criterias = [ StrictNotEqual('status', 'draft'), Null('anonymised'), ] # add custom marker to criteria so it can be found back and removed in # drafts() or with_anonymised() criterias[0].exclude_drafts = True criterias[1].exclude_anonymised = True self._criterias = criterias self._order_by = order_by self._limit = limit self._cached_resultset = None self._report_error_type = report_error_type or 'record-error' def report_error(self, error_message): if self._report_error_type == 'request-error': raise RequestError(error_message) if self._report_error_type == 'session-error': get_session().message = ('warning', error_message) if self._report_error_type == 'record-error': get_publisher().record_error( error_message, formdata=self._formdata, ) @property # @property for backward compatibility def count(self): if not hasattr(self, '_count_cache'): self._count_cache = self._formdef.data_class().count(clause=self._criterias) return self._count_cache def _clone(self, criterias, order_by=None): return LazyFormDefObjectsManager( formdef=self._formdef, formdata=self._formdata, geoloc_center_formdata=self._geoloc_center_formdata, criterias=criterias, order_by=order_by or self._order_by, limit=self._limit, ) def order_by(self, attribute): if not isinstance(attribute, str): self.report_error( _('Invalid value %r for "order_by"') % attribute, ) return self.none() field = self.get_field(attribute) return self._clone(self._criterias, order_by=field or attribute) def limit(self, limit): qs = self._clone(self._criterias) qs._limit = limit return qs def all(self): # (expose 'all' only to mimick django, it's not actually useful as this # object serves as both manager and queryset) return self._clone(None) def none(self): return self._clone([Nothing()]) def pending(self): status_filters = ['wf-%s' % x.id for x in self._formdef.workflow.get_not_endpoint_status()] criterias = [Contains('status', status_filters)] return self._clone(self._criterias + criterias) def current_user(self): # filter on current user return self.filter_by_user(get_request().user) def filter_by_user(self, user, op='eq'): if op not in ['eq']: self.report_error( _('Invalid operator "%(operator)s" for filter "%(filter)s"') % {'operator': self.get_operator_name(op), 'filter': self.pending_attr}, ) return self.none() if isinstance(user, str): user = get_publisher().user_class.lookup_by_string(user) if not user: return self.none() return self._clone(self._criterias + [Equal('user_id', str(user.id))]) def filter_by_status(self, status, op='eq'): if op not in ['eq', 'ne']: self.report_error( _('Invalid operator "%(operator)s" for filter "%(filter)s"') % {'operator': self.get_operator_name(op), 'filter': self.pending_attr}, ) return self.none() for wfs in self._formdef.workflow.possible_status: if wfs.name == status: wf_status = 'wf-%s' % wfs.id return self._clone( self._criterias + [self.get_criteria_from_operator(op=op, value=wf_status, field_id='status')] ) return self.none() def with_custom_view(self, custom_view_slug): lookup_criterias = [ Equal('formdef_type', self._formdef.xml_root_node), Equal('formdef_id', self._formdef.id), Contains('visibility', ['any', 'datasource']), Equal('slug', custom_view_slug), ] try: custom_view = get_publisher().custom_view_class.select(lookup_criterias)[0] except IndexError: self.report_error(_('Unknown custom view "%(slug)s"') % {'slug': custom_view_slug}) return self.none() return self._clone(self._criterias + custom_view.get_criterias(), order_by=custom_view.order_by) def exclude_self(self): assert self._formdata if not self._formdata.id: if getattr(self._formdata, '_edited_id', None): return self._clone(self._criterias + [StrictNotEqual('id', str(self._formdata._edited_id))]) return self._clone(self._criterias) return self._clone(self._criterias + [StrictNotEqual('id', str(self._formdata.id))]) def same_user(self): assert self._formdata if self._formdata.user_id is None: return self return self._clone(self._criterias + [Equal('user_id', str(self._formdata.user.id))]) def drafts(self): criterias = [x for x in self._criterias if not getattr(x, 'exclude_drafts', False)] return self._clone(criterias + [Equal('status', 'draft')]) def with_anonymised(self): criterias = [x for x in self._criterias if not getattr(x, 'exclude_anonymised', True)] return self._clone(criterias) def with_drafts(self): criterias = [x for x in self._criterias if not getattr(x, 'exclude_drafts', False)] return self._clone(criterias) def done(self): status_filters = ['wf-%s' % x.id for x in self._formdef.workflow.get_endpoint_status()] criterias = [Contains('status', status_filters)] return self._clone(self._criterias + criterias) def set_geo_center(self, lazy_formdata): qs = self._clone(self._criterias) qs._geoloc_center_formdata = lazy_formdata._formdata return qs def filter_by_distance(self, distance, op=None): try: distance = int(misc.unlazy(distance)) except (TypeError, ValueError): get_publisher().record_error(_('invalid value for distance (%r)') % distance) distance = None center = (self._geoloc_center_formdata or self._formdata).get_auto_geoloc() if center is None or distance is None: return self.none() return self._clone(self._criterias + [Distance(center, distance)]) def filter_by(self, attribute): qs = self._clone(self._criterias) qs.pending_attr = attribute return qs def filter_by_internal_id(self, value, op='eq'): from wcs.backoffice.filter_fields import InternalIdFilterField field = InternalIdFilterField(formdef=self._formdef) operators = self.get_field_allowed_operators(field) if op not in [o[0] for o in operators]: self.report_error( _('Invalid operator "%(operator)s" for filter "%(filter)s"') % {'operator': self.get_operator_name(op), 'filter': self.pending_attr}, ) return self.none() try: int(value) except (ValueError, TypeError): self.report_error( _('Invalid value "%s" for filter "internal_id"') % (value), ) return self.none() return self._clone( self._criterias + [self.get_criteria_from_operator(op=op, value=str(value), field_id='id')] ) def filter_by_number(self, value, op='eq'): if op not in ['eq']: self.report_error( _('Invalid operator "%(op)s" for filter "number"') % {'op': self.get_operator_name(op)} ) return self.none() return self._clone(self._criterias + [Equal('id_display', str(value))]) def filter_by_identifier(self, value, op='eq'): if op not in ['eq']: self.report_error( _('Invalid operator "%(op)s" for filter "identifier"') % {'op': self.get_operator_name(op)} ) return self.none() return self._clone(self._criterias + [self._formdef.get_by_id_criteria(str(value))]) def get_fields(self, key): for field in self._formdef.iter_fields(include_block_fields=True, with_no_data_fields=False): if getattr(field, 'block_field', None): if field.key == 'items': # not yet continue if field.contextual_varname == key: yield field def get_field(self, key): for field in self.get_fields(key): return field def get_field_allowed_operators(self, field): equality_operators = [ ('eq', '='), ('ne', '!='), ] comparison_operators = [ ('lt', '<'), ('lte', '<='), ('gt', '>'), ('gte', '>='), ] more_comparison_operators = [ ('between', _('between')), ] empty_operators = [ ('absent', _('absent')), ('existing', _('existing')), ] in_operators = [ ('in', _('in')), ('not_in', _('not in')), ] text_operators = [ ('icontains', _('contains')), ] if field.key == 'internal-id': return equality_operators + comparison_operators if field.key in ['string', 'text']: return ( equality_operators + comparison_operators + more_comparison_operators + in_operators + empty_operators + text_operators ) if field.key in ['date', 'item', 'items', 'numeric']: return ( equality_operators + comparison_operators + more_comparison_operators + in_operators + empty_operators ) if field.key == 'bool': return equality_operators + empty_operators if field.key == 'email': return equality_operators + in_operators + empty_operators + text_operators if field.key == 'file': return empty_operators return None def format_value(self, op, value, field): pending_attr = getattr(self, 'pending_attr', None) if not pending_attr: pending_attr = field.varname or field.id if self._report_error_type == 'request-error': pending_attr = 'filter-%s' % pending_attr def check_int(val): if isinstance(val, str) and '_' in val: # do not consider _ a valid character in numbers # (unlike python where it can be used to group digits) return val try: # cast to integer so it can be used with numerical operators # (limit to 32bits to match postgresql integer range) int_value = int(val) if -(2**31) <= int_value < 2**31 and (int_value == 0 or str(value)[0] != '0'): return int_value except (ValueError, TypeError): return str(val) return val def convert_value(value, field): if field.convert_value_from_anything and value is not Ellipsis: try: value = field.convert_value_from_anything(value) except (ValueError, AttributeError): self.report_error( _('Invalid value "%(value)s" for filter "%(filter)s"') % {'value': value, 'filter': pending_attr}, ) raise ValueError if value is not None and hasattr(field, 'block_field') and hasattr(field, 'get_json_value'): # in block fields, we store the whole block as json, so we filter # against json values value = field.get_json_value(value) return value if field.key not in ['date', 'item', 'items', 'string', 'text', 'bool', 'email', 'numeric']: return convert_value(value=value, field=field) if op in ['in', 'not_in', 'between'] and field.key != 'items': if isinstance(value, (tuple, list)): new_value = list(value)[:] else: new_value = [ convert_value(value=x.strip(), field=field) for x in str(value).split('|') if x.strip() ] if op == 'between' and len(new_value) != 2: if self._report_error_type != 'session-error': # don't report error for management view self.report_error( _('Invalid value "%(value)s" for operator "%(operator)s" and filter "%(filter)s"') % { 'value': value, 'operator': self.get_operator_name(op), 'filter': pending_attr, }, ) raise ValueError value = new_value else: value = convert_value(value=value, field=field) if ( op not in ['absent', 'existing', 'in', 'not_in', 'between'] and field.key == 'items' and value and len(value) == 1 ): # items field, if only one value operators are allowed, we need a single value value = value[0] if field.key in ['string', 'item', 'items']: if isinstance(value, list): value = [check_int(v) for v in value] # make sure all elements are of the same type if not all(isinstance(v, int) for v in value): value = [str(v) for v in value] else: value = check_int(value) return value def get_criteria_from_operator(self, op, value, field_id, field=None): operators_mapping = { 'eq': Equal, 'ne': NotEqual, 'lt': Less, 'lte': LessOrEqual, 'gt': Greater, 'gte': GreaterOrEqual, 'in': Contains, 'icontains': ILike, } if isinstance(value, list) and op in ['eq', 'ne']: # items field, with a list of values: use in and not_in operators op = 'in' if op == 'eq' else 'not_in' if op == 'not_in': return Not(self.get_criteria_from_operator(op='in', value=value, field_id=field_id, field=field)) if op == 'existing': return Not( self.get_criteria_from_operator(op='absent', value=value, field_id=field_id, field=field) ) if op in operators_mapping: return operators_mapping[op](field_id, value, field=field) if op == 'absent': criterias = [] if field: if not getattr(field, 'block_field', None): criterias.append(Null(field_id, field=field)) elif field.key == 'bool': criterias.append( And( [ Not(Equal(field_id, True, field=field)), Not(Equal(field_id, False, field=field)), ] ) ) elif field.key == 'date': criterias.append(Equal(field_id, '', field=field)) if field.key not in ['bool', 'date', 'numeric']: criterias.append(Equal(field_id, '', field=field)) if field.key == 'items': criterias.append(Equal(field_id, [])) return Or(criterias) if op == 'between': if not len(value) == 2: return Nothing() min_value, max_value = value[0], value[1] if min_value > max_value: min_value, max_value = max_value, min_value return Between(field_id, [min_value, max_value], field=field) def get_operator_name(self, op): operator_names_mapping = { 'eq': 'equal', 'ne': 'not_equal', 'lt': 'less_than', 'lte': 'less_than_or_equal', 'gt': 'greater_than', 'gte': 'greater_than_or_equal', } return operator_names_mapping.get(op) or op def apply_filter_value(self, value, exclude=False): if not hasattr(self, 'pending_attr'): self.report_error(_('|filter_value called without |filter_by')) return self.none() if not self.pending_attr: self.report_error(_('|filter_value called without attribute (check |filter_by parameter)')) return self.none() op = 'ne' if exclude else getattr(self, 'pending_op', 'eq') if self.pending_attr in ['status', 'user', 'internal_id', 'number', 'distance', 'identifier']: return getattr(self, 'filter_by_%s' % self.pending_attr)(value, op) fields = list(self.get_fields(self.pending_attr)) if not fields: self.report_error(_('Invalid filter "%s"') % self.pending_attr) return self.none() # check operator for field in fields: if field.key not in [ 'date', 'item', 'items', 'string', 'text', 'bool', 'email', 'numeric', 'file', ]: continue operators = self.get_field_allowed_operators(field) or [] if op not in [o[0] for o in operators]: self.report_error( _('Invalid operator "%(operator)s" for filter "%(filter)s"') % {'operator': self.get_operator_name(op), 'filter': self.pending_attr}, ) return self.none() if value is not None: try: # consider all fields with same varname are of the same type # (it should definitely be) value = self.format_value(op=op, value=value, field=fields[0]) except ValueError: return self.none() from wcs import sql # build criterias criterias = [] for field in fields: field_id = sql.get_field_id(field) if value is None: if op not in ['eq', 'ne']: # None value with comparison operator (le, lte, gt, gte, in, not_in, between ...) return self.none() if hasattr(field, 'block_field'): # no None values in block field data, apply absent/existing filters if exclude: criteria = self.get_criteria_from_operator( op='existing', value=value, field_id=field_id, field=field ) else: criteria = self.get_criteria_from_operator( op='absent', value=value, field_id=field_id, field=field ) else: criteria_class = NotNull if exclude else Null criteria = criteria_class(field_id) elif field.key not in [ 'date', 'item', 'items', 'string', 'text', 'bool', 'email', 'numeric', 'file', ]: criteria_class = NotEqual if exclude else Equal criteria = criteria_class(field_id, value, field=field) else: criteria = self.get_criteria_from_operator(op=op, value=value, field_id=field_id, field=field) criterias.append(criteria) if len(criterias) > 1: if exclude: criterias = [And(criterias)] else: criterias = [Or(criterias)] return self._clone(self._criterias + criterias) def apply_exclude_value(self, value): if hasattr(self, 'pending_op'): self.report_error( _('Operator filter is not allowed for exclude_value filter'), ) return self.none() return self.apply_filter_value(value, exclude=True) def apply_op(self, op): self.pending_op = op if op in ['absent', 'existing']: return self.apply_filter_value(Ellipsis) return self def apply_eq(self): return self.apply_op('eq') def apply_ne(self): return self.apply_op('ne') def apply_lt(self): return self.apply_op('lt') def apply_lte(self): return self.apply_op('lte') def apply_gt(self): return self.apply_op('gt') def apply_gte(self): return self.apply_op('gte') def apply_in(self): return self.apply_op('in') def apply_not_in(self): return self.apply_op('not_in') def apply_absent(self): return self.apply_op('absent') def apply_existing(self): return self.apply_op('existing') def apply_between(self): return self.apply_op('between') def apply_icontains(self): return self.apply_op('icontains') def getlist(self, key): return LazyList(self, key) def getlistdict(self, keys): results = [] for lazy_formdata in self: result = {} for key in keys: value = lazy_formdata.get(key) if hasattr(value, 'timetuple'): value = value.timetuple() elif hasattr(value, 'get_value'): value = value.get_value() result[key] = value results.append(result) return results def _populate_cache(self): if self._cached_resultset is not None: return result = self._formdef.data_class().select_iterator( clause=self._criterias, order_by=self._order_by, limit=self._limit, itersize=200 ) self._cached_resultset = [LazyFormData(x) for x in result] def __getattr__(self, attribute): if attribute.startswith('count_status_'): # backward compatibility status = attribute[len('count_status_') :] return len(self._formdef.data_class().get_ids_with_indexed_value('status', 'wf-%s' % status)) if attribute.startswith('filter_by_'): attribute_name = attribute[len('filter_by_') :] return lambda: self.filter_by(attribute_name) if attribute == 'formdef': warnings.warn('Deprecated access to formdef', DeprecationWarning) return self._formdef raise AttributeError(attribute) def __len__(self): if self._cached_resultset is not None: return len(self._cached_resultset) return self.count def __getitem__(self, key): try: if not isinstance(key, slice): int(key) except ValueError: # A django template doing formdef.objects.drafts would start by # doing ['drafts'], that would raise TypeError and then continue # to accessing .drafts (this is done in _resolve_lookup). # We need to abort earlier as we don't want to load all formdata # in that situation. raise TypeError self._populate_cache() return self._cached_resultset[key] def __iter__(self): self._populate_cache() yield from self._cached_resultset def __nonzero__(self): return any(self) class LazyList: def __init__(self, lazy_manager, key): self._lazy_manager = lazy_manager self._key = key self._cached_resultset = None def _populate_cache(self): if self._cached_resultset is not None: return self._cached_resultset = [] for lazy_formdata in self._lazy_manager: value = lazy_formdata.get(self._key) if value is not None: if hasattr(value, 'timetuple'): value = value.timetuple() elif hasattr(value, 'get_value'): value = value.get_value() self._cached_resultset.append(value) def __len__(self): if self._cached_resultset is not None: return len(self._cached_resultset) return len(self._lazy_manager) def __repr__(self): return '' % ( self._lazy_manager._formdef.verbose_name, _(':'), self._lazy_manager._formdef.name, _('attribute:'), self._key, ) def __iter__(self): self._populate_cache() yield from self._cached_resultset def __nonzero__(self): return any(self) def __contains__(self, value): if self._cached_resultset is not None: field = self._lazy_manager.get_field(self._key) if field is not None: try: value = field.convert_value_from_anything(value) except (ValueError, AttributeError): pass return value in list(self) queryset = list(self._lazy_manager.filter_by(self._key).apply_filter_value(value).limit(1)) return len(queryset) == 1 def __eq__(self, other): return list(self) == list(other) def __getitem__(self, key): return list(self)[key] def get_value(self): # unlazy operation return list(self) class LazyFormDef: def __init__(self, formdef): self._formdef = formdef @property def name(self): return self._formdef.name @property def slug(self): return self._formdef.url_name @property def class_name(self): # reserved for logged errors return self._formdef.__class__.__name__ @property def objects(self): return LazyFormDefObjectsManager(self._formdef) @property def option(self): return LazyFormDefOptions(self._formdef) @property def type(self): return self._formdef.xml_root_node @property def backoffice_submission_url(self): return self._formdef.get_backoffice_submission_url() @property def frontoffice_submission_url(self): return self._formdef.get_url() @property def publication_disabled(self): return self._formdef.is_disabled() @property def publication_datetime(self): return self._formdef.publication_datetime @property def publication_expiration_datetime(self): return self._formdef.expiration_datetime class LazyFormData(LazyFormDef): # noqa pylint: disable=too-many-public-methods def __init__(self, formdata): super().__init__(formdata.formdef) self._formdata = formdata def inspect_keys(self): hidden_keys = {'field', 'inspect_keys', 'page_no', 'formdef', 'objects'} if self.type != 'formdef' or not self._formdata.id: hidden_keys.add('short_url') for key in dir(self): if key[0] == '_' or key in hidden_keys: continue if key == 'parent': if self.parent: # hide parent when it's None yield key, False # = do not recurse else: yield key @property def objects(self): return LazyFormDefObjectsManager(self._formdef, formdata=self._formdata) @property def formdef(self): return LazyFormDef(self._formdata.formdef) @property def internal_id(self): if hasattr(self._formdata, '_edited_id'): # when a formdata is being edited the transient object keeps # the original id in this attribute. return self._formdata._edited_id return self._formdata.id @property def receipt_date(self): if not self._formdata.receipt_time: return '' return formats.date_format(make_datetime(self._formdata.receipt_time)) @property def receipt_time(self): if not self._formdata.receipt_time: return '' return formats.time_format(make_datetime(self._formdata.receipt_time)) @property def identifier(self): return self._formdata.identifier @property def number(self): return self._formdata.get_display_id() @property def number_raw(self): return str(self._formdata.id) if self._formdata.id else None @property def url(self): return self._formdata.get_url() @property def url_backoffice(self): return self._formdata.get_url(backoffice=True) @property def backoffice_url(self): return self._formdata.get_url(backoffice=True) @property def api_url(self): return self._formdata.get_api_url() @property def short_url(self): return self._formdata.get_short_url() @property def uri(self): return '%s/%s/' % (self._formdef.url_name, self._formdata.id) @property def criticality_level(self): return self._formdata.criticality_level @property def criticality_label(self): try: return self._formdata.get_criticality_level_object().name except IndexError: return None @property def digest(self): return self._formdata.default_digest @property def display_name(self): return self._formdata.get_display_name() @property def receipt_datetime(self): return make_datetime(self._formdata.receipt_time) if self._formdata.receipt_time else None @property def last_update_datetime(self): last_update_time = self._formdata.last_update_time return make_datetime(last_update_time) if last_update_time else None @property def status(self): return self._formdata.get_status_label() @property def status_is_endpoint(self): return self._formdata.is_at_endpoint_status() @property def tracking_code(self): formdata = self._formdata if not formdata.status and formdata.data: if 'future_tracking_code' in formdata.data: return formdata.data['future_tracking_code'] elif 'draft_formdata_id' in formdata.data: formdata = formdata.formdef.data_class().get(formdata.data['draft_formdata_id']) return formdata.tracking_code @property def submission_backoffice(self): return self._formdata.backoffice_submission @property def submission_channel(self): return self._formdata.submission_channel @property def submission_channel_label(self): return self._formdata.get_submission_channel_label() @property def submission_agent(self): try: return LazyUser(get_publisher().user_class.get(self._formdata.submission_agent_id)) except (TypeError, KeyError): return None @property def submission_context(self): return self._formdata.submission_context @property def status_url(self): return '%sstatus' % self._formdata.get_url() @property def details(self): return self._formdata.get_form_details() _cached_user = Ellipsis @property def user(self): if self._cached_user is Ellipsis: user = self._formdata.get_user() self._cached_user = LazyUser(user) if user else None return self._cached_user @property def var(self): return LazyFormDataVar(self._formdef.get_all_fields(), self._formdata.data, self._formdata) @property def field(self): # no lazy dictionary here as it's legacy. d = {} for field in self._formdef.get_all_fields(): if not hasattr(field, 'get_view_value'): continue value = self._formdata.data.get(field.id) if value is not None and field.convert_value_to_str: value = field.convert_value_to_str(value) elif value is None: value = '' identifier_name = misc.simplify(field.label, space='_') d[identifier_name] = value return d @property def role(self): workflow_roles = {} if self._formdef.workflow_roles: workflow_roles.update(self._formdef.workflow_roles) if self._formdata.workflow_roles: workflow_roles.update(self._formdata.workflow_roles) return get_workflow_roles_substitution_variables(workflow_roles) @property def comment(self): if self._formdata.evolution: latest_evolution = self._formdata.evolution[-1] return latest_evolution.get_plain_text_comment() or latest_evolution.comment or '' return '' @property def page_no(self): return int(self._formdata.page_no) @property def attachments(self): from wcs.workflows import AttachmentsSubstitutionProxy return AttachmentsSubstitutionProxy(self._formdata) @property def geoloc(self): data = {} if self._formdata.geolocations: for k, v in self._formdata.geolocations.items(): data[k] = v data[k + '_lat'] = v.get('lat') data[k + '_lon'] = v.get('lon') return data @property def distance(self): return getattr(self._formdata, '_distance', None) @property def previous_status(self): if getattr(self._formdata, '_previous_status', None): # for minimal formdata object used during edition. return self._formdata._previous_status if self._formdata.evolution: for evolution in reversed(self._formdata.evolution): if evolution.status and evolution.status != self._formdata.status: return self._formdata.get_status_label(evolution.status) return '' @property def status_changed(self): first_evolution_in_current_status = None for evolution in reversed(self._formdata.evolution or []): if evolution.status and evolution.status != self._formdata.status: break if evolution.status: first_evolution_in_current_status = evolution return bool( self.status != self.previous_status and self._formdata.evolution and self._formdata.evolution[-1].status and first_evolution_in_current_status is self._formdata.evolution[-1] and not self._formdata.evolution[-1].last_jump_datetime ) @property def evolution(self): return self._formdef.get_detailed_evolution(self._formdata) @property def links(self): from .wf.create_formdata import LazyFormDataLinks return LazyFormDataLinks(self._formdata) @property def reverse_links(self): return LazyFormDataReverseLinks(self._formdata) @property def workflow_email(self): # form_ workflow_email_ _ _ etc. # ex: form_email_xxx_2_addresses from .wf.sendmail import LazyFormDataEmailsBase return LazyFormDataEmailsBase(self._formdata) @property def workflow_form(self): # form_ workflow_form_ _ _var_ etc. # ex: form_workflow_form_xxx_2_var_file # (index can be "latest") from .wf.form import LazyFormDataWorkflowForms return LazyFormDataWorkflowForms(self._formdata) @property def trigger(self): # form_ trigger_ _ _content_ etc. # ex: form_trigger_paid_content_XXX # (index can be "latest") from .wf.jump import LazyFormDataWorkflowTriggers return LazyFormDataWorkflowTriggers(self._formdata) @property def parent(self): formdata = self._formdata.get_parent() if formdata is None: return None return formdata.get_substitution_variables() @property def workflow_data(self): data = self._formdata.workflow_data or {} return {k: data[k] for k in data if not k.startswith('_')} def export_to_json(self, include_files=True): # this gets used to generate an email attachment :/ return self._formdata.export_to_json(include_files=include_files) def get(self, key): # compatibility with |get filter, to return a field by varname try: return getattr(self.var, key) except AttributeError: # fallback to CompatibilityNamesDict, this allows filters to do # queryset|first|get:"form_var_plop" compat_dict = CompatibilityNamesDict({'form': self}) return compat_dict.get(key) except Exception as e: get_publisher().record_error(_('|get called with invalid key (%s)') % key, exception=e) return None def __getitem__(self, key): try: return getattr(self, str(key)) except AttributeError: if isinstance(key, str) and key.startswith('f'): for field in self._formdef.get_all_fields(): if str(field.id).replace('-', '_') == str(key[1:]): return self._formdata.data.get(field.id) raise class LazyFormDataReverseLinks: def __init__(self, formdata): self._formdata = formdata _relations = None @property def relations(self): if self._relations is not None: return self._relations self._relations = {} for relation in self._formdata.formdef.reverse_relations: if not relation['varname']: continue key = '%s_%s' % (relation['obj'], relation['varname']) key = key.replace(':', '_').replace('-', '_') self._relations[key] = relation return self._relations def inspect_keys(self): return self.relations.keys() def __getitem__(self, key): if not isinstance(key, str): raise KeyError(str(key)) relation = self.relations[key] formdef_type, formdef_slug = relation['obj'].split(':') formdef_class = FormDef if formdef_type == 'carddef': formdef_class = CardDef formdef = formdef_class.get_by_slug(formdef_slug, ignore_errors=True) if formdef is None: return None lazy_manager = LazyFormDefObjectsManager(formdef=formdef) formdatas = [ LazyFormDataReverseLinksItem(formdata) for formdata in lazy_manager.filter_by(relation['varname']).apply_filter_value( str(self._formdata.get_natural_key()) ) ] if not formdatas: return [] return LazyFormDataReverseLinksItems(formdatas) class LazyFormDataReverseLinksItems: inspect_collapse = True def __init__(self, formdatas): self._formdatas = formdatas def inspect_keys(self): return [str(x) for x in range(len(self._formdatas))] def __getitem__(self, key): try: key = int(key) except ValueError: raise KeyError return self._formdatas[key] def __len__(self): return len(self._formdatas) def __iter__(self): yield from self._formdatas class LazyFormDataReverseLinksItem: inspect_collapse = True def __init__(self, formdata): self._formdata = formdata def inspect_keys(self): return ['form'] @property def form(self): return self._formdata def __getitem__(self, key): try: return getattr(self, key) except AttributeError: compat_dict = CompatibilityNamesDict({'form': self._formdata}) return compat_dict[key] class LazyFormDataVar: def __init__(self, fields, data, formdata=None, base_formdata=None): self._fields = fields self._data = data or {} self._formdata = formdata self._base_formdata = base_formdata def inspect_keys(self): return self.varnames.keys() _varnames = None @property def varnames(self): if self._varnames is not None: return self._varnames self._varnames = {} for field in self._fields: if field.is_no_data_field: continue if not field.varname or not CompatibilityNamesDict.valid_key_regex.match(field.varname): continue if field.varname in self._varnames: # duplicated varname value = self._data.get(self._varnames[field.varname].id) if value or value is False: # previous field had a value (not None or the empty string), # stay on it. continue # else continue and update _varnames with new field reference. self._varnames[field.varname] = field return self._varnames def get_field_kwargs(self, field): return { 'data': self._data, 'field': field, 'formdata': self._formdata, 'base_formdata': self._base_formdata, } def __getitem__(self, key): if not isinstance(key, str): raise KeyError(str(key)) try: field = self.varnames[key] except KeyError: # key was unknown but for some values we may still have to provide # multiple keys (for example file fields will expect to have both # form_var_foo and form_var_foo_raw set to None) and user # conditions may use the "is" operator that cannot be overridden # (this applies to None as well as boolean values). # # Therefore we catch unknown keys with known suffixes ("foo_raw") # and remove the suffix to get the actual field. If the data is # None or a boolean type, we return it as is. if not (key.endswith('_raw') or key.endswith('_url')): raise maybe_varname = key.rsplit('_', 1)[0] field = self.varnames[maybe_varname] if key.endswith('_raw') and field.key == 'bool': # turn None into False so boolean fields are always a boolean. return bool(self._data.get(field.id)) if self._data.get(field.id) in (None, True, False): # valid suffix and data of the correct type return self._data.get(field.id) raise KeyError(key) # let boolean pass through, to get None handled as False if field.key != 'bool': if self._data.get(field.id) is None: return NoneFieldVar(**self.get_field_kwargs(field)) if str(field.id) not in self._data: raise KeyError(key) klass = LazyFieldVar if field.store_structured_value: klass = LazyFieldVarStructured klass = { # custom types 'date': LazyFieldVarDate, 'map': LazyFieldVarMap, 'password': LazyFieldVarPassword, 'file': LazyFieldVarFile, 'block': LazyFieldVarBlock, 'bool': LazyFieldVarBool, 'computed': LazyFieldVarComputed, 'items': LazyFieldVarItems, }.get(field.key, klass) return klass(**self.get_field_kwargs(field)) def __getattr__(self, attr): try: return self.__getitem__(attr) except KeyError: raise AttributeError(attr) class LazyFieldVar: def __init__(self, data, field, formdata=None, base_formdata=None, **kwargs): self._data = data self._field = field self._formdata = formdata self._base_formdata = base_formdata self._field_kwargs = kwargs @property def raw(self): if self._field.store_display_value or self._field.key in ('file', 'date'): return self._data.get(self._field.id) raise AttributeError('raw') def get_value(self): if self._field.store_display_value: return self._data.get('%s_display' % self._field.id) value = self._data.get(self._field.id) if self._field.convert_value_to_str: return self._field.convert_value_to_str(value) return value def __str__(self): return force_str(self.get_value()) def __nonzero__(self): if self._field.key == 'bool': return bool(self._data.get(self._field.id)) return bool(self.get_value()) __bool__ = __nonzero__ def __contains__(self, value): return value in self.get_value() def __eq__(self, other): return force_str(self) == force_str(other) def __ne__(self, other): return force_str(self) != force_str(other) def __lt__(self, other): return force_str(self) < force_str(other) def __le__(self, other): return force_str(self) <= force_str(other) def __gt__(self, other): return force_str(self) > force_str(other) def __ge__(self, other): return force_str(self) >= force_str(other) def __getitem__(self, key): if isinstance(key, (int, slice)): return self.get_value()[key] try: return getattr(self, key) except AttributeError: pass raise KeyError(key) def __iter__(self): return iter(self.get_value()) def __add__(self, other): return self.get_value().__add__(other) def __radd__(self, other): return other + self.get_value() def __mul__(self, other): return self.get_value().__mul__(other) def __len__(self): return len(self.get_value()) def __int__(self): return int(self.get_value()) def startswith(self, other): return self.get_value().startswith(other) def strip(self, *args): warnings.warn('Deprecated use of .strip method', DeprecationWarning) return self.get_value().strip(*args) def split(self, *args, **kwargs): # Compatibility with usage of variable as a string. It is # recommended to use appropriate properties instead. return str(self).split(*args, **kwargs) def __getstate__(self): raise AssertionError('lazy cannot be pickled') class NoneFieldVar(LazyFieldVar): def get_value(self): return None def __len__(self): return 0 class LazyFieldVarComplex(LazyFieldVar): def has_live_data_source(self): real_data_source = self._field.get_real_data_source() if not real_data_source: return False if real_data_source.get('type', '') == 'wcs:users': return True if real_data_source.get('type', '').startswith('carddef:'): return True return False def inspect_keys(self): keys = [] structured_value = self.get_field_var_value() def walk(base, value): if isinstance(value, dict): for k, v in value.items(): if CompatibilityNamesDict.valid_key_regex.match(k): walk(k if not base else base + '_' + k, v) else: keys.append(base) if isinstance(structured_value, list): for i, value in enumerate(structured_value): walk(str(i), value) elif isinstance(structured_value, dict): walk('', structured_value) return keys def __getitem__(self, key): try: return super().__getitem__(key) except KeyError: pass structured_value = self.get_field_var_value() if not structured_value: raise KeyError(key) if isinstance(structured_value, dict): return structured_value[key] if isinstance(structured_value, list): for i, struct_value in enumerate(structured_value): if str(key) == str(i): return struct_value raise KeyError(key) class LazyFieldVarLiveCardMixin: def get_data_id(self): return self._data.get(self._field.id) @property def live(self): real_data_source = self._field.get_real_data_source() if not ( real_data_source and ( real_data_source.get('type', '').startswith('carddef:') or real_data_source.get('type', '') == 'wcs:users' ) ): raise AttributeError('live') if real_data_source.get('type', '') == 'wcs:users': try: return LazyUser(get_publisher().user_class.get(self.get_data_id())) except KeyError: return None request = get_request() card_id = self.get_data_id() if request: # cache during request cache_key = '%s-%s' % (self._field.data_source['type'], card_id) if not hasattr(request, 'live_card_cache'): request.live_card_cache = {} else: carddata = request.live_card_cache.get(cache_key, Ellipsis) if carddata is None: return None elif carddata is not Ellipsis: # cached data return LazyFormData(carddata) from wcs.carddef import CardDef try: carddef = CardDef.get_by_urlname(self._field.data_source['type'].split(':')[1], use_cache=True) carddata = carddef.data_class().get_by_id(card_id) if request: request.live_card_cache[cache_key] = carddata except KeyError: if request: request.live_card_cache[cache_key] = None return None return LazyFormData(carddata) class LazyFieldVarComputed(LazyFieldVarComplex, LazyFieldVarLiveCardMixin): def inspect_keys(self): keys = super().inspect_keys() try: self.live except AttributeError: pass # don't advertise if there's no value behind else: keys.append('live') return keys def get_field_var_value(self): return self.get_value() class LazyFieldVarStructured(LazyFieldVarComplex, LazyFieldVarLiveCardMixin): def inspect_keys(self): if not self._data.get(self._field.id): return [] real_data_source = self._field.get_real_data_source() if ( real_data_source and self._field.key in ('item', 'items') and real_data_source.get('type', '') == 'wcs:users' ): return ['raw', 'live'] structured_value = self._field.get_structured_value(self._data) if not structured_value: return ['raw'] keys = ['raw', 'structured'] if ( real_data_source and self._field.key in ('item', 'items') and real_data_source.get('type', '').startswith('carddef:') ): try: self.live except (AttributeError, KeyError): # don't advertise "live" if linked data is missing pass else: keys.append('live') keys.extend(super().inspect_keys()) return keys @property def structured_raw(self): # backward compatibility, _structured should be use. return self._field.get_structured_value(self._data) @property def structured(self): return self._field.get_structured_value(self._data) def get_field_var_value(self): return self.structured class DateOperatorsMixin: def __eq__(self, other): if hasattr(other, 'timetuple'): other = other.timetuple() elif hasattr(other, 'get_value'): other = other.get_value() return parse_datetime(self.timetuple()) == parse_datetime(other) def __ne__(self, other): if hasattr(other, 'timetuple'): other = other.timetuple() elif hasattr(other, 'get_value'): other = other.get_value() return parse_datetime(self.timetuple()) != parse_datetime(other) def __gt__(self, other): if hasattr(other, 'timetuple'): other = other.timetuple() elif hasattr(other, 'get_value'): other = other.get_value() return parse_datetime(self.timetuple()) > parse_datetime(other) def __lt__(self, other): if hasattr(other, 'timetuple'): other = other.timetuple() elif hasattr(other, 'get_value'): other = other.get_value() return parse_datetime(self.timetuple()) < parse_datetime(other) def __ge__(self, other): if hasattr(other, 'timetuple'): other = other.timetuple() elif hasattr(other, 'get_value'): other = other.get_value() return parse_datetime(self.timetuple()) >= parse_datetime(other) def __le__(self, other): if hasattr(other, 'timetuple'): other = other.timetuple() elif hasattr(other, 'get_value'): other = other.get_value() return parse_datetime(self.timetuple()) <= parse_datetime(other) class LazyDateObject(DateOperatorsMixin, SimpleLazyObject): pass def lazy_date(value): return LazyDateObject(lambda: value) class LazyFieldVarDate(DateOperatorsMixin, LazyFieldVar): def inspect_keys(self): return ['year', 'month', 'day'] def get_raw(self): return self._data.get(self._field.id) def timetuple(self): return self.get_raw() # for backward compatibility with sites using time.struct_time # methods we still have to provide time.struct_time properties. @property def tm_year(self): return self.get_raw().tm_year @property def tm_mon(self): return self.get_raw().tm_mon @property def tm_mday(self): return self.get_raw().tm_mday @property def tm_hour(self): return self.get_raw().tm_hour @property def tm_min(self): return self.get_raw().tm_min @property def tm_sec(self): return self.get_raw().tm_sec @property def tm_wday(self): return self.get_raw().tm_wday @property def tm_yday(self): return self.get_raw().tm_yday year = tm_year month = tm_mon day = tm_mday class LazyFieldVarMap(LazyFieldVarStructured): def inspect_keys(self): return ['lat', 'lon', 'reverse'] if self.get_field_var_value() else [] @property def reverse(self): return LazyFieldVarMapReverse(lat=self['lat'], lon=self['lon']) class LazyFieldVarMapReverse: inspect_collapse = True def __init__(self, lat, lon): self.lat = lat self.lon = lon def inspect_keys(self): return ['address'] @property def address(self): data = misc.get_reverse_geocoding_data(self.lat, self.lon) return (json.loads(data) or {}).get('address') class LazyFieldVarLiveSequenceItem(LazyFieldVarLiveCardMixin): def __init__(self, field, data, idx): self._field = field self._data = data self.idx = idx def get_data_id(self): return self._data.get(self._field.id)[self.idx] class LazyFieldVarLiveSequence: def __init__(self, field, data): self._field = field self._data = data self._list = data.get(self._field.id) or [] def inspect_keys(self): # do not advertise index of unavailable items for i in range(len(self)): # noqa pylint: disable=consider-using-enumerate try: self[i] except KeyError: pass # removed item else: yield str(i) def __len__(self): return len(self._list) def __iter__(self): yield from self._list def __getitem__(self, key): try: key = int(key) except ValueError: raise KeyError(key) try: return LazyFieldVarLiveSequenceItem(self._field, self._data, key).live except (AttributeError, IndexError): raise KeyError(key) class LazyFieldVarItems(LazyFieldVarStructured): @property def live(self): return LazyFieldVarLiveSequence(self._field, self._data) def getlist(self, key): structured_value = self.structured if structured_value: return [str(x.get(key, '')) for x in structured_value or []] elif key in ('id', 'text'): # simple source, where there's no structure value, just a simple # list of values that can be considered both id and text. return self.raw or [] return [] def getlistdict(self, keys): return [{key: str(x.get(key, '')) or None for key in keys} for x in self.structured or []] def __len__(self): return len(self.getlist('id')) def __contains__(self, value): return str(value) in self.getlist('id') or (self.structured and str(value) in self.getlist('text')) def get_iterable_value(self): try: return [self.live[i] for i in range(len(self.live))] except KeyError: return self.structured or self.raw class LazyFieldVarBool(LazyFieldVar): def get_value(self): return bool(self._data.get(self._field.id)) @property def raw(self): return self.get_value() class LazyFieldVarPassword(LazyFieldVar): def __getitem__(self, key): # get subpart (cleartext, md5, sha1) if it exists field_value = self._data.get(self._field.id) if key in field_value: return field_value[key] return super()._getitem__(key) class LazyFieldVarFile(LazyFieldVar): def inspect_keys(self): keys = ['raw'] # raw value should always have a get_fs_filename method, this protects against # invalid values. if hasattr(self.raw, 'get_fs_filename'): if hasattr(self._formdata, 'get_file_base_url') or self._base_formdata: keys.append('url') if self.raw.get_fs_filename(): keys.append('file_size') return keys @property def url(self): if 'url' not in self.inspect_keys(): return None if self._base_formdata: return self._field.get_download_url(formdata=self._base_formdata, file_value=self.raw) return self._field.get_download_url(formdata=self._formdata, **self._field_kwargs) @property def file_size(self): return self.raw.get_file_size() if self.raw else None def __len__(self): return len(self.raw.base_filename) if self.raw and self.raw.base_filename else 0 class LazyBlockDataVar(LazyFormDataVar): def __init__( self, fields, data, formdata=None, parent_field=None, parent_field_index=0, base_formdata=None ): super().__init__(fields, data, formdata=formdata) self.parent_field = parent_field self.parent_field_index = parent_field_index self.base_formdata = base_formdata def get_field_kwargs(self, field): kwargs = super().get_field_kwargs(field) kwargs['parent_field'] = self.parent_field kwargs['parent_field_index'] = self.parent_field_index kwargs['base_formdata'] = self.base_formdata return kwargs class LazyFieldVarBlock(LazyFieldVar): def inspect_keys(self): if self._field.max_items and self._field.max_items > 1: data = self._formdata.data.get(self._field.id)['data'] return [str(x) for x in range(len(data))] else: return ['var'] def get_value(self): return self._data.get(str(self._field.id)) def get_iterable_value(self): return list(self) def __str__(self): return self._data.get('%s_display' % self._field.id) or '---' def __getitem__(self, key): try: int(key) except ValueError: return super().__getitem__(key) try: data = self._formdata.data.get(self._field.id)['data'][int(key)] except IndexError: raise KeyError(key) try: block_field = self._field.block except KeyError: # block was deleted, ignore return None return LazyBlockDataVar( block_field.fields, data, formdata=self._formdata, parent_field=self._field, parent_field_index=int(key), base_formdata=self._base_formdata, ) def __len__(self): data = self._formdata.data.get(self._field.id)['data'] return len(data) @property def var(self): # alias when there's a single item return self[0] def __iter__(self): data = self._formdata.data.get(self._field.id)['data'] for i in range(len(data)): yield self[i] def getlist(self, key): # called by |getlist filter for field in self._field.block.fields: if field.varname == key: break else: try: value = [CompatibilityNamesDict({'X': x})[f'X_{key}'] for x in self] except KeyError: raise AttributeError(str(key)) return value return [data.get(field.id) for data in self._formdata.data.get(self._field.id)['data']] def getlistdict(self, keys): fields = [] for key in keys: matchings_fields = [field for field in self._field.block.fields if field.varname == key] matching_field_id = matchings_fields[0].id if matchings_fields else None fields.append((key, matching_field_id)) return [ {key: data.get(field_id) if field_id else None for key, field_id in fields} for data in self._formdata.data.get(self._field.id)['data'] ] class LazyUser: def __init__(self, user): self._user = user def inspect_keys(self): return ['display_name', 'email', 'var', 'nameid', 'has_deleted_account'] @property def display_name(self): return self._user.display_name @property def email(self): return self._user.email @property def has_deleted_account(self): return bool(self._user.deleted_timestamp) @property def var(self): return LazyFormDataVar(self._user.get_formdef().fields, self._user.form_data) @property def admin_access(self): return self._user.can_go_in_admin() @property def backoffice_access(self): return self._user.can_go_in_backoffice() @property def name_identifier(self): d = {} for i, name_identifier in enumerate(self._user.name_identifiers): d[str(i)] = name_identifier return d @property def nameid(self): return self._user.nameid def get_value(self): return self._user def __getitem__(self, key): return getattr(self, key) def __getattr__(self, attr): try: return super().__getattr__(attr) except AttributeError: return getattr(self._user, attr) class LazyRequest: def __init__(self, request): self._request = request @property def quixote_request(self): # compatibility return self._request @property def GET(self): return self._request.django_request.GET @property def META(self): return self._request.django_request.META @property def is_in_backoffice(self): return self._request.is_in_backoffice() @property def is_from_mobile(self): return self._request.is_from_mobile() @property def method(self): return self._request.method @property def user(self): return self._request.user @property def view_name(self): return getattr(self._request, 'view_name', None) class LazyFormDefOptions(LazyFormDataVar): def __init__(self, formdef): self._formdef = formdef try: fields = self._formdef.workflow.variables_formdef.fields except AttributeError: fields = [] data = self._formdef.workflow_options or {} for field in fields: # change field IDs as options are stored in data with their # varnames, not id. field.id = field.varname or field.id if hasattr(field, 'default_value') and data.get(field.varname) is None: if isinstance(field.default_value, str): data[field.varname] = field.convert_value_from_str(field.default_value) else: data[field.varname] = field.default_value super().__init__(fields, data) def inspect_keys(self): # don't display "parameter replacement" options return [x for x in self.varnames if '*' not in x] class CardsSource: @classmethod def get_substitution_variables(cls): return {'cards': cls()} def inspect_keys(self): return [] def __getattr__(self, attr): if attr == 'inspect_collapse': return False try: return LazyFormDef(CardDef.get_by_urlname(attr, use_cache=True)) except KeyError: raise CardDefDoesNotExist(attr) class FormsSource: @classmethod def get_substitution_variables(cls): return {'forms': cls()} def inspect_keys(self): return [] def __getattr__(self, attr): if attr == 'inspect_collapse': return False try: return LazyFormDef(FormDef.get_by_urlname(attr, use_cache=True)) except KeyError: raise FormDefDoesNotExist(attr)