# w.c.s. - web application for online forms # Copyright (C) 2005-2019 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . from quixote import get_publisher, get_request, get_response, get_session from wcs.formdata import FormData from .qommon import _ from .qommon.afterjobs import AfterJob from .sql_criterias import Equal, Null, Or, get_field_id class CardData(FormData): uuid = None def get_data_source_structured_item( self, digest_key='default', group_by=None, with_related_urls=False, with_files_urls=False ): if not self.digests: if digest_key == 'default': summary = _('Digest (default) not defined') else: summary = _('Digest (custom view "%s") not defined') % digest_key.replace('custom-view:', '') get_publisher().record_error(summary, formdata=self) item = { 'id': self.id_display if self.formdef.id_template else self.id, 'text': (self.digests or {}).get(digest_key) or '', } if with_related_urls: edit_related_url = self.get_edit_related_url() if edit_related_url: item['edit_related_url'] = edit_related_url view_related_url = self.get_view_related_url() if view_related_url: item['view_related_url'] = view_related_url if group_by: item['group_by'] = self.data.get(f'{group_by}_display') or self.data.get(str(group_by)) for field in self.formdef.get_all_fields(): if not field.varname or field.varname in ('id', 'text'): continue value = self.data and self.data.get(field.id) if with_files_urls and hasattr(value, 'file_digest'): item['%s_url' % field.varname] = self.get_file_by_token_url(value.file_digest()) if isinstance(value, str): item[field.varname] = value return item def get_edit_related_url(self): wf_status = self.get_status() if wf_status is None: return for _item in wf_status.items: if not _item.key == 'editable': continue if not _item.check_auth(self, get_request().user): continue return ( self.get_url( backoffice=get_request().is_in_backoffice(), include_category=True, language=get_publisher().current_language, ) + 'wfedit-%s' % _item.id ) def get_view_related_url(self): if not self.formdef.is_user_allowed_read(get_request().user, self): return return self.get_url(backoffice=True) def get_display_label(self, digest_key='default'): return (self.digests or {}).get(digest_key) or self.get_display_name() def get_author_qualification(self): return None def get_file_base_url(self): return '%sdownload' % self.get_api_url() def just_created(self): super().just_created() if self.submission_agent_id: self.evolution[0].who = self.submission_agent_id @classmethod def get_submission_channels(cls): return {'web': _('Web'), 'file-import': _('File Import')} @classmethod def get_by_uuid(cls, value): try: return cls.select([Equal('uuid', value)], limit=1)[0] except IndexError: raise KeyError(value) def get_file_by_token_url(self, file_digest): context = { 'carddef_slug': self.formdef.url_name, 'data_id': self.id, 'file_digest': file_digest, } token = get_session().create_token('card-file-by-token', context) return '/api/card-file-by-token/%s' % token.id def update_related(self): if self.is_draft(): return if self.formdef.reverse_relations: job = UpdateRelationsAfterJob(carddata=self) if get_response(): job._update_key = (self._formdef.id, self.id) # do not register/run job if an identical job is already planned if job._update_key not in ( getattr(x, '_update_key', None) for x in get_response().after_jobs or [] ): job.store() get_response().add_after_job(job) else: job.execute() self._has_changed_digest = False class UpdateRelationsAfterJob(AfterJob): label = _('Updating relations') def __init__(self, carddata): super().__init__(carddef_id=carddata.formdef.id, carddata_id=carddata.id) def execute(self): from .carddef import CardDef from .formdef import FormDef if getattr(get_publisher(), '_update_related_seen', None) is None: get_publisher()._update_related_seen = set() # keep track of objects that have been updated, to avoid cycles update_related_seen = get_publisher()._update_related_seen try: carddef = CardDef.cached_get(self.kwargs['carddef_id']) carddata = carddef.data_class().get(self.kwargs['carddata_id']) except KeyError: # card got removed (probably the afterjob met some unexpected delay), ignore. return klass = {'carddef': CardDef, 'formdef': FormDef} publisher = get_publisher() # check all known reverse relations for obj_ref in {x['obj'] for x in carddef.reverse_relations}: obj_type, obj_slug = obj_ref.split(':') obj_class = klass.get(obj_type) try: objdef = obj_class.get_by_slug(obj_slug, use_cache=True) except KeyError: continue criterias = [] fields = [] # get fields referencing the card model (only item and items fields, as string # field with data source is just for completion, and computed field with data # source, do not store a display value. for field in objdef.iter_fields(include_block_fields=True): if field.key not in ('item', 'items'): continue data_source = getattr(field, 'data_source', None) if not data_source: continue data_source_type = data_source.get('type') if ( not data_source_type.startswith('carddef:') or data_source_type.split(':')[1] != carddef.slug ): continue fields.append(field) criterias.append(Equal(get_field_id(field), carddata.identifier, field=field)) if not criterias: continue def update_data(field, data): display_value = data.get(f'{field.id}_display') field.set_value(data, data.get(field.id)) return bool(data.get(f'{field.id}_display') != display_value) # look for all formdata, including drafts, excluding anonymised select_criterias = [Null('anonymised'), Or(criterias)] for objdata in objdef.data_class().select_iterator(clause=select_criterias, itersize=200): objdata_seen_key = f'{objdata.formdef.xml_root_node}:{objdata.formdef.slug}:{objdata.id}' if objdata_seen_key in update_related_seen: # do not allow updates to cycle back continue publisher.reset_formdata_state() publisher.substitutions.feed(objdata.formdef) publisher.substitutions.feed(objdata) objdata_changed = False for field in fields: if getattr(field, 'block_field', None): if objdata.data.get(field.block_field.id): blockdata_changed = False for block_row_data in objdata.data[field.block_field.id]['data']: blockdata_changed |= update_data(field, block_row_data) if blockdata_changed: # if block data changed, maybe block digest changed too update_data(field.block_field, objdata.data) objdata_changed |= blockdata_changed else: objdata_changed |= update_data(field, objdata.data) if objdata_changed: update_related_seen.add(objdata_seen_key) objdata.store()