wcs/wcs/carddata.py

232 lines
9.1 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2019 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from quixote import get_publisher, get_request, get_response, get_session
from wcs.formdata import FormData
from .qommon import _
from .qommon.afterjobs import AfterJob
from .sql_criterias import Equal, Null, Or, get_field_id
class CardData(FormData):
uuid = None
def get_data_source_structured_item(
self, digest_key='default', group_by=None, with_related_urls=False, with_files_urls=False
):
if not self.digests:
if digest_key == 'default':
summary = _('Digest (default) not defined')
else:
summary = _('Digest (custom view "%s") not defined') % digest_key.replace('custom-view:', '')
get_publisher().record_error(summary, formdata=self)
item = {
'id': self.id_display if self.formdef.id_template else self.id,
'text': (self.digests or {}).get(digest_key) or '',
}
if with_related_urls:
edit_related_url = self.get_edit_related_url()
if edit_related_url:
item['edit_related_url'] = edit_related_url
view_related_url = self.get_view_related_url()
if view_related_url:
item['view_related_url'] = view_related_url
if group_by:
item['group_by'] = self.data.get(f'{group_by}_display') or self.data.get(str(group_by))
for field in self.formdef.get_all_fields():
if not field.varname or field.varname in ('id', 'text'):
continue
value = self.data and self.data.get(field.id)
if with_files_urls and hasattr(value, 'file_digest'):
item['%s_url' % field.varname] = self.get_file_by_token_url(value.file_digest())
if isinstance(value, str):
item[field.varname] = value
return item
def get_edit_related_url(self):
wf_status = self.get_status()
if wf_status is None:
return
for _item in wf_status.items:
if not _item.key == 'editable':
continue
if not _item.check_auth(self, get_request().user):
continue
return (
self.get_url(
backoffice=get_request().is_in_backoffice(),
include_category=True,
language=get_publisher().current_language,
)
+ 'wfedit-%s' % _item.id
)
def get_view_related_url(self):
if not self.formdef.is_user_allowed_read(get_request().user, self):
return
return self.get_url(backoffice=True)
def get_display_label(self, digest_key='default'):
return (self.digests or {}).get(digest_key) or self.get_display_name()
def get_author_qualification(self):
return None
def get_file_base_url(self):
return '%sdownload' % self.get_api_url()
def just_created(self):
super().just_created()
if self.submission_agent_id:
self.evolution[0].who = self.submission_agent_id
@classmethod
def get_submission_channels(cls):
return {'web': _('Web'), 'file-import': _('File Import')}
@classmethod
def get_by_uuid(cls, value):
try:
return cls.select([Equal('uuid', value)], limit=1)[0]
except IndexError:
raise KeyError(value)
def get_file_by_token_url(self, file_digest):
context = {
'carddef_slug': self.formdef.url_name,
'data_id': self.id,
'file_digest': file_digest,
}
token = get_session().create_token('card-file-by-token', context)
return '/api/card-file-by-token/%s' % token.id
def update_related(self):
if self.is_draft():
return
if self.formdef.reverse_relations:
job = UpdateRelationsAfterJob(carddata=self)
if get_response():
job._update_key = (self._formdef.id, self.id)
# do not register/run job if an identical job is already planned
if job._update_key not in (
getattr(x, '_update_key', None) for x in get_response().after_jobs or []
):
job.store()
get_response().add_after_job(job)
else:
job.execute()
self._has_changed_digest = False
class UpdateRelationsAfterJob(AfterJob):
label = _('Updating relations')
def __init__(self, carddata):
super().__init__(carddef_id=carddata.formdef.id, carddata_id=carddata.id)
def execute(self):
from .carddef import CardDef
from .formdef import FormDef
if getattr(get_publisher(), '_update_related_seen', None) is None:
get_publisher()._update_related_seen = set()
# keep track of objects that have been updated, to avoid cycles
update_related_seen = get_publisher()._update_related_seen
try:
carddef = CardDef.cached_get(self.kwargs['carddef_id'])
carddata = carddef.data_class().get(self.kwargs['carddata_id'])
except KeyError:
# card got removed (probably the afterjob met some unexpected delay), ignore.
return
klass = {'carddef': CardDef, 'formdef': FormDef}
publisher = get_publisher()
# check all known reverse relations
for obj_ref in {x['obj'] for x in carddef.reverse_relations}:
obj_type, obj_slug = obj_ref.split(':')
obj_class = klass.get(obj_type)
try:
objdef = obj_class.get_by_slug(obj_slug, use_cache=True)
except KeyError:
continue
criterias = []
fields = []
# get fields referencing the card model (only item and items fields, as string
# field with data source is just for completion, and computed field with data
# source, do not store a display value.
for field in objdef.iter_fields(include_block_fields=True):
if field.key not in ('item', 'items'):
continue
data_source = getattr(field, 'data_source', None)
if not data_source:
continue
data_source_type = data_source.get('type')
if (
not data_source_type.startswith('carddef:')
or data_source_type.split(':')[1] != carddef.slug
):
continue
fields.append(field)
criterias.append(Equal(get_field_id(field), carddata.identifier, field=field))
if not criterias:
continue
def update_data(field, data):
display_value = data.get(f'{field.id}_display')
field.set_value(data, data.get(field.id))
return bool(data.get(f'{field.id}_display') != display_value)
# look for all formdata, including drafts, excluding anonymised
select_criterias = [Null('anonymised'), Or(criterias)]
for objdata in objdef.data_class().select_iterator(clause=select_criterias, itersize=200):
objdata_seen_key = f'{objdata.formdef.xml_root_node}:{objdata.formdef.slug}:{objdata.id}'
if objdata_seen_key in update_related_seen:
# do not allow updates to cycle back
continue
publisher.reset_formdata_state()
publisher.substitutions.feed(objdata.formdef)
publisher.substitutions.feed(objdata)
objdata_changed = False
for field in fields:
if getattr(field, 'block_field', None):
if objdata.data.get(field.block_field.id):
blockdata_changed = False
for block_row_data in objdata.data[field.block_field.id]['data']:
blockdata_changed |= update_data(field, block_row_data)
if blockdata_changed:
# if block data changed, maybe block digest changed too
update_data(field.block_field, objdata.data)
objdata_changed |= blockdata_changed
else:
objdata_changed |= update_data(field, objdata.data)
if objdata_changed:
update_related_seen.add(objdata_seen_key)
objdata.store()