1682 lines
62 KiB
Python
1682 lines
62 KiB
Python
#
|
|
# combo - content management system
|
|
# Copyright (C) 2014-2015 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import copy
|
|
import logging
|
|
import urllib.parse
|
|
|
|
from django.conf import settings
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.db import models
|
|
from django.forms import Select
|
|
from django.forms import models as model_forms
|
|
from django.template import Context, RequestContext, Template, TemplateSyntaxError, VariableDoesNotExist
|
|
from django.utils.html import escape
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.text import slugify
|
|
from django.utils.translation import gettext_lazy as _
|
|
from requests.exceptions import RequestException
|
|
|
|
from combo.data.library import register_cell_class
|
|
from combo.data.models import CellBase, Page
|
|
from combo.utils import NothingInCacheException, requests
|
|
from combo.utils.misc import is_portal_agent
|
|
|
|
from .utils import get_wcs_json, get_wcs_services, is_wcs_enabled
|
|
|
|
invalid_reason_codes = {
|
|
'wcs_form_not_defined': _('No form set'),
|
|
'wcs_form_not_found': _('Invalid form'),
|
|
'wcs_card_not_defined': _('No card model set'),
|
|
'wcs_card_not_found': _('Invalid card model'),
|
|
'wcs_card_relation_not_found': _('Invalid Card Identifier'),
|
|
'wcs_category_not_defined': _('No category set'),
|
|
'wcs_category_not_found': _('Invalid category'),
|
|
'wcs_site_not_found': _('Invalid site'),
|
|
'wcs_data_failure': _('Failed to get data'),
|
|
}
|
|
|
|
|
|
def get_formdef_css_classes(formdef):
|
|
classes = []
|
|
if formdef.get('redirection'):
|
|
classes.append('is-redirection')
|
|
if formdef.get('authentication_required'):
|
|
classes.append('required-authentication')
|
|
for authn_context in formdef.get('required_authentication_contexts') or []:
|
|
classes.append('required-%s-authentication' % authn_context)
|
|
for keyword in formdef.get('keywords') or []:
|
|
classes.append('keyword-%s' % slugify(keyword))
|
|
return classes
|
|
|
|
|
|
def is_a_bot(request):
|
|
return request and 'bot' in (request.headers.get('User-Agent') or '').lower()
|
|
|
|
|
|
@register_cell_class
|
|
class WcsFormCell(CellBase):
|
|
formdef_reference = models.CharField(_('Form'), max_length=150)
|
|
|
|
cached_title = models.CharField(_('Title'), max_length=150)
|
|
cached_url = models.URLField(_('URL'))
|
|
cached_json = JSONField(blank=True, default=dict)
|
|
|
|
default_template_name = 'combo/wcs/form.html'
|
|
add_as_link_label = _('add a form link')
|
|
add_link_label = _('New form link')
|
|
edit_link_label = _('Edit form link')
|
|
add_as_link_code = 'form-link'
|
|
invalid_reason_codes = invalid_reason_codes
|
|
|
|
is_enabled = classmethod(is_wcs_enabled)
|
|
|
|
class Meta:
|
|
verbose_name = _('Form Link')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsFormCellForm
|
|
|
|
return WcsFormCellForm
|
|
|
|
def get_form_class_for_link_list_cell(self):
|
|
from .forms import WcsFormForLinkListCellForm
|
|
|
|
return WcsFormForLinkListCellForm
|
|
|
|
def save(self, *args, **kwargs):
|
|
if 'update_fields' in kwargs:
|
|
# don't populate the cache
|
|
return super().save(*args, **kwargs)
|
|
|
|
def populate_cache():
|
|
if self.formdef_reference:
|
|
wcs_key, form_slug = self.formdef_reference.split(':')
|
|
wcs_site = get_wcs_services().get(wcs_key)
|
|
forms_response_json = get_wcs_json(wcs_site, 'api/formdefs/')
|
|
|
|
if not forms_response_json or forms_response_json.get('err') == 1:
|
|
# can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
form_found = False
|
|
for form in forms_response_json.get('data') or []:
|
|
slug = form.get('slug')
|
|
if slug == form_slug:
|
|
self.cached_title = form.get('title')
|
|
self.cached_url = form.get('url')
|
|
self.cached_json = form
|
|
self.save(update_fields=['cached_title', 'cached_url', 'cached_json'])
|
|
form_found = True
|
|
break
|
|
if form_found:
|
|
self.mark_as_valid()
|
|
return
|
|
else:
|
|
return self.mark_as_invalid('wcs_form_not_found')
|
|
else:
|
|
return self.mark_as_invalid('wcs_form_not_defined')
|
|
|
|
super().save(*args, **kwargs)
|
|
populate_cache()
|
|
|
|
def get_cell_extra_context(self, context):
|
|
request = context.get('request')
|
|
context = super().get_cell_extra_context(context)
|
|
context['slug'] = self.formdef_reference.split(':')[-1]
|
|
context['title'] = self.cached_title
|
|
context['url'] = self.cached_url
|
|
if not is_a_bot(request):
|
|
context['url'] += 'tryauth?cancelurl=%s' % urllib.parse.quote(request.build_absolute_uri())
|
|
if self.cached_json:
|
|
context['description'] = mark_safe(self.cached_json.get('description', ''))
|
|
context['css_classes'] = get_formdef_css_classes(self.cached_json)
|
|
for attribute in self.cached_json:
|
|
if attribute not in context:
|
|
context[attribute] = self.cached_json.get(attribute)
|
|
return context
|
|
|
|
def get_additional_label(self):
|
|
if not self.cached_title:
|
|
return
|
|
return self.cached_title
|
|
|
|
def render_for_search(self):
|
|
return ''
|
|
|
|
def get_external_links_data(self):
|
|
if not (self.cached_url and self.cached_title):
|
|
return []
|
|
text = ''
|
|
if self.cached_json:
|
|
text = ' '.join(
|
|
[self.cached_json.get('description', ''), ' '.join(self.cached_json.get('keywords', []))]
|
|
).strip()
|
|
return [
|
|
{
|
|
'url': self.cached_url,
|
|
'title': self.cached_title,
|
|
'text': text,
|
|
}
|
|
]
|
|
|
|
def get_slug_for_asset(self):
|
|
return self.formdef_reference
|
|
|
|
def get_label_for_asset(self):
|
|
return str(self)
|
|
|
|
def get_asset_slot_key(self, key):
|
|
# for legacy
|
|
return 'wcs:form:%s:%s' % (key, self.get_slug_for_asset())
|
|
|
|
def get_asset_slot_templates(self):
|
|
# for legacy
|
|
if settings.WCS_FORM_ASSET_SLOTS:
|
|
return settings.WCS_FORM_ASSET_SLOTS
|
|
return super().get_asset_slot_templates()
|
|
|
|
|
|
class WcsCommonCategoryCell(CellBase):
|
|
is_enabled = classmethod(is_wcs_enabled)
|
|
category_reference = models.CharField(_('Category'), max_length=150)
|
|
|
|
cached_title = models.CharField(_('Title'), max_length=150)
|
|
cached_description = models.TextField(_('Description'), blank=True)
|
|
cached_url = models.URLField(_('Cached URL'))
|
|
|
|
invalid_reason_codes = invalid_reason_codes
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def save(self, *args, **kwargs):
|
|
if 'update_fields' in kwargs:
|
|
# don't populate the cache
|
|
return super().save(*args, **kwargs)
|
|
|
|
def populate_cache():
|
|
if self.category_reference:
|
|
wcs_key, category_slug = self.category_reference.split(':')
|
|
wcs_site = get_wcs_services().get(wcs_key)
|
|
categories_response_json = get_wcs_json(wcs_site, 'api/categories/')
|
|
|
|
if not categories_response_json or categories_response_json.get('err') == 1:
|
|
# can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
category_found = False
|
|
for category in categories_response_json.get('data') or []:
|
|
slug = category.get('slug')
|
|
if slug == category_slug:
|
|
self.cached_title = category.get('title')
|
|
self.cached_description = category.get('description') or ''
|
|
self.cached_url = category.get('url')
|
|
self.save(update_fields=['cached_title', 'cached_description', 'cached_url'])
|
|
category_found = True
|
|
break
|
|
if category_found:
|
|
self.mark_as_valid()
|
|
return
|
|
else:
|
|
return self.mark_as_invalid('wcs_category_not_found')
|
|
else:
|
|
return self.mark_as_invalid('wcs_category_not_defined')
|
|
|
|
super().save(*args, **kwargs)
|
|
populate_cache()
|
|
|
|
def get_additional_label(self):
|
|
if not self.cached_title:
|
|
return
|
|
return self.cached_title
|
|
|
|
def get_slug_for_asset(self):
|
|
return self.category_reference
|
|
|
|
def get_label_for_asset(self):
|
|
return str(self)
|
|
|
|
def get_asset_slot_key(self, key):
|
|
# for legacy
|
|
return 'wcs:category:%s:%s' % (key, self.get_slug_for_asset())
|
|
|
|
def get_asset_slot_templates(self):
|
|
# for legacy
|
|
if settings.WCS_CATEGORY_ASSET_SLOTS:
|
|
return settings.WCS_CATEGORY_ASSET_SLOTS
|
|
return super().get_asset_slot_templates()
|
|
|
|
|
|
@register_cell_class
|
|
class WcsCategoryCell(WcsCommonCategoryCell):
|
|
default_template_name = 'combo/wcs/category.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Category Link')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsCategoryCellForm
|
|
|
|
return WcsCategoryCellForm
|
|
|
|
def get_cell_extra_context(self, context):
|
|
context = super().get_cell_extra_context(context)
|
|
if not self.category_reference:
|
|
return context
|
|
context['slug'] = self.category_reference.split(':')[-1]
|
|
context['title'] = self.cached_title
|
|
context['description'] = self.cached_description
|
|
context['url'] = self.cached_url
|
|
return context
|
|
|
|
|
|
class WcsBlurpMixin:
|
|
is_enabled = classmethod(is_wcs_enabled)
|
|
cache_duration = 5
|
|
api_url = None
|
|
warn_on_4xx = True
|
|
invalid_reason_codes = invalid_reason_codes
|
|
|
|
def get_api_url(self, context):
|
|
return self.api_url
|
|
|
|
def get_api_url_for_site(self, context, wcs_slug):
|
|
return self.get_api_url(context)
|
|
|
|
def get_data(self, context):
|
|
if context.get('placeholder_search_mode'):
|
|
# don't call webservices when we're just looking for placeholders
|
|
return {}
|
|
if self.wcs_site:
|
|
try:
|
|
wcs_sites = {self.wcs_site: get_wcs_services()[self.wcs_site]}
|
|
except KeyError:
|
|
# in case of the site disappeared from settings
|
|
return {}
|
|
else:
|
|
wcs_sites = get_wcs_services()
|
|
|
|
result = {}
|
|
returns = set()
|
|
for wcs_slug, _wcs_site in wcs_sites.items():
|
|
api_url = self.get_api_url_for_site(context, wcs_slug)
|
|
if not api_url:
|
|
# nothing to call for this site
|
|
continue
|
|
url = _wcs_site.get('url')
|
|
if not url.endswith('/'):
|
|
url += '/'
|
|
wcs_site = copy.deepcopy(_wcs_site)
|
|
result[wcs_slug] = wcs_site
|
|
wcs_site['base_url'] = url
|
|
wcs_site['slug'] = wcs_slug
|
|
|
|
try:
|
|
response = requests.get(
|
|
api_url,
|
|
remote_service=wcs_site,
|
|
user=None
|
|
if getattr(self, 'without_user', False)
|
|
else getattr(context.get('request'), 'user', None),
|
|
without_user=getattr(self, 'without_user', False),
|
|
cache_duration=self.cache_duration,
|
|
raise_if_not_cached=not (context.get('synchronous')),
|
|
log_errors=False,
|
|
)
|
|
returns.add(response.status_code)
|
|
response.raise_for_status()
|
|
except RequestException:
|
|
continue
|
|
json_response = response.json()
|
|
if json_response.get('err', 0) == 0:
|
|
wcs_site['data'] = json_response['data']
|
|
else:
|
|
# skip errors
|
|
continue
|
|
# and mark all items with the site info
|
|
for item in wcs_site['data']:
|
|
item['site_slug'] = wcs_slug
|
|
|
|
if returns and 200 not in returns: # not a single valid answer
|
|
logging.debug('failed to get data from any %s (%r)', api_url, returns)
|
|
if all([400 <= r < 500 for r in returns]) and self.warn_on_4xx:
|
|
# only 4xx errors, not a user cell, report the cell as invalid
|
|
self.mark_as_invalid('wcs_data_failure', force=False)
|
|
else:
|
|
self.mark_as_valid()
|
|
else:
|
|
self.mark_as_valid()
|
|
|
|
return result
|
|
|
|
def get_cell_extra_context(self, context):
|
|
return {self.variable_name: self.get_data(context)}
|
|
|
|
|
|
class WcsDataBaseCell(CellBase, WcsBlurpMixin):
|
|
is_enabled = classmethod(is_wcs_enabled)
|
|
wcs_site = models.CharField(_('Site'), max_length=50, blank=True)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def get_additional_label(self):
|
|
wcs_sites = {x: y.get('title') for x, y in get_wcs_services().items()}
|
|
if len(wcs_sites.keys()) < 2:
|
|
return ''
|
|
if self.wcs_site in wcs_sites:
|
|
return wcs_sites[self.wcs_site]
|
|
return _('All Sites')
|
|
|
|
def get_form_fields(self):
|
|
if len(get_wcs_services()) == 1:
|
|
return []
|
|
return ['wcs_site']
|
|
|
|
def get_form_widgets(self):
|
|
if len(get_wcs_services()) == 1:
|
|
return {}
|
|
combo_wcs_sites = [('', _('All'))]
|
|
wcs_sites = [(x, y.get('title')) for x, y in get_wcs_services().items()]
|
|
wcs_sites.sort(key=lambda x: x[1])
|
|
combo_wcs_sites.extend(wcs_sites)
|
|
return {'wcs_site': Select(choices=combo_wcs_sites)}
|
|
|
|
def get_default_form_class(self):
|
|
fields = self.get_form_fields()
|
|
if not fields:
|
|
return None
|
|
return model_forms.modelform_factory(self.__class__, fields=fields, widgets=self.get_form_widgets())
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
extra_context.update(WcsBlurpMixin.get_cell_extra_context(self, context))
|
|
return extra_context
|
|
|
|
|
|
class WcsUserDataBaseCell(WcsDataBaseCell):
|
|
warn_on_4xx = False
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def is_visible(self, request, **kwargs):
|
|
user = getattr(request, 'user', None)
|
|
if not user or user.is_anonymous:
|
|
return False
|
|
return super().is_visible(request, **kwargs)
|
|
|
|
|
|
class CategoriesAndWcsSiteValidityMixin:
|
|
def check_validity(self):
|
|
if self.wcs_site and self.wcs_site not in get_wcs_services():
|
|
self.mark_as_invalid('wcs_site_not_found')
|
|
return
|
|
|
|
categories = self.categories.get('data', [])
|
|
|
|
if not categories:
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
for category_reference in categories:
|
|
wcs_key, category_slug = category_reference.split(':')
|
|
wcs_site = get_wcs_services().get(wcs_key)
|
|
categories_response_json = get_wcs_json(wcs_site, 'api/categories/')
|
|
|
|
if not categories_response_json or categories_response_json.get('err') == 1:
|
|
# can not retrieve data, don't report cell as invalid
|
|
continue
|
|
|
|
category_found = any(
|
|
[c.get('slug') == category_slug for c in categories_response_json.get('data') or []]
|
|
)
|
|
if not category_found:
|
|
self.mark_as_invalid('wcs_category_not_found')
|
|
return
|
|
|
|
self.mark_as_valid()
|
|
|
|
|
|
class CategoriesFilteringMixin:
|
|
def get_api_url_for_site(self, context, wcs_slug):
|
|
url = self.get_api_url(context)
|
|
|
|
if self.categories and self.categories.get('data'):
|
|
categories_by_site = collections.defaultdict(list)
|
|
for category in self.categories['data']:
|
|
key, slug = category.split(':')
|
|
categories_by_site[key].append(slug)
|
|
if not categories_by_site.get(wcs_slug):
|
|
return None
|
|
separator = '?' if '?' not in url else '&'
|
|
url += '%scategory_slugs=%s' % (separator, ','.join(categories_by_site.get(wcs_slug)))
|
|
|
|
return url
|
|
|
|
|
|
@register_cell_class
|
|
class WcsCurrentFormsCell(CategoriesAndWcsSiteValidityMixin, CategoriesFilteringMixin, WcsUserDataBaseCell):
|
|
variable_name = 'user_forms'
|
|
loading_message = _('Loading forms...')
|
|
|
|
categories = JSONField(_('Categories'), blank=True, default=dict)
|
|
custom_title = models.CharField(_('Custom Title'), max_length=150, blank=True)
|
|
current_forms = models.BooleanField(_('Current Forms'), default=True)
|
|
done_forms = models.BooleanField(_('Done Forms'), default=False)
|
|
include_drafts = models.BooleanField(_('Include drafts'), default=False)
|
|
include_forms_user_can_access = models.BooleanField(
|
|
_('Include forms to which the user can access'), default=False
|
|
)
|
|
|
|
class Meta:
|
|
verbose_name = _('User Forms')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsCurrentFormsCellForm
|
|
|
|
return WcsCurrentFormsCellForm
|
|
|
|
def get_api_url(self, context):
|
|
url = '/api/user/forms/'
|
|
user = self.get_concerned_user(context)
|
|
if user and not user.is_anonymous:
|
|
user_name_id = user.get_name_id()
|
|
if user_name_id:
|
|
url = '/api/users/%s/forms' % user_name_id
|
|
if self.current_forms and self.done_forms:
|
|
url += '?status=all'
|
|
elif self.done_forms:
|
|
url += '?status=done'
|
|
else:
|
|
url += '?status=open'
|
|
if self.include_forms_user_can_access:
|
|
url += '&include-accessible=on'
|
|
if self.include_drafts:
|
|
url += '&include-drafts=on'
|
|
url += '&limit=100&sort=desc'
|
|
return url
|
|
|
|
@property
|
|
def default_template_name(self):
|
|
if self.current_forms and self.done_forms:
|
|
return 'combo/wcs/user_all_forms.html'
|
|
if self.done_forms:
|
|
return 'combo/wcs/user_done_forms.html'
|
|
return 'combo/wcs/current_forms.html'
|
|
|
|
def get_additional_label(self):
|
|
initial_label = super().get_additional_label()
|
|
if self.include_drafts:
|
|
if self.current_forms and self.done_forms:
|
|
label = _('All Forms and Drafts')
|
|
elif self.done_forms:
|
|
label = _('Done Forms and Drafts')
|
|
else:
|
|
label = _('Current Forms and Drafts')
|
|
else:
|
|
if self.current_forms and self.done_forms:
|
|
label = _('All Forms')
|
|
elif self.done_forms:
|
|
label = _('Done Forms')
|
|
else:
|
|
label = _('Current Forms')
|
|
if initial_label:
|
|
return '%s - %s' % (initial_label, label)
|
|
return label
|
|
|
|
def get_cell_extra_context(self, context):
|
|
context = super().get_cell_extra_context(context)
|
|
|
|
context['current_forms'] = context['user_forms'] # legacy
|
|
|
|
# regroup all forms in a flat list
|
|
context['forms'] = []
|
|
for wcs_site in context['user_forms']:
|
|
if not context['user_forms'].get(wcs_site):
|
|
continue
|
|
if not context['user_forms'][wcs_site].get('data'):
|
|
continue
|
|
context['forms'].extend(context['user_forms'][wcs_site]['data'])
|
|
|
|
return context
|
|
|
|
|
|
@register_cell_class
|
|
class WcsCurrentDraftsCell(CategoriesAndWcsSiteValidityMixin, CategoriesFilteringMixin, WcsUserDataBaseCell):
|
|
variable_name = 'current_drafts'
|
|
default_template_name = 'combo/wcs/current_drafts.html'
|
|
loading_message = _('Loading drafts...')
|
|
|
|
categories = JSONField(_('Categories'), blank=True, default=dict)
|
|
|
|
class Meta:
|
|
verbose_name = _('Current Drafts')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsCurrentDraftsCellForm
|
|
|
|
return WcsCurrentDraftsCellForm
|
|
|
|
def get_api_url(self, context):
|
|
user = self.get_concerned_user(context)
|
|
if user and not user.is_anonymous:
|
|
user_name_id = user.get_name_id()
|
|
if user_name_id:
|
|
return '/api/users/%s/drafts' % user_name_id
|
|
return '/api/user/drafts'
|
|
|
|
def get_cell_extra_context(self, context):
|
|
context = super().get_cell_extra_context(context)
|
|
|
|
# regroup all forms in a flat list
|
|
context['drafts'] = []
|
|
for wcs_site in context['current_drafts']:
|
|
if not context['current_drafts'].get(wcs_site):
|
|
continue
|
|
if not context['current_drafts'][wcs_site].get('data'):
|
|
continue
|
|
context['drafts'].extend(context['current_drafts'][wcs_site]['data'])
|
|
|
|
return context
|
|
|
|
|
|
@register_cell_class
|
|
class WcsFormsOfCategoryCell(WcsCommonCategoryCell, WcsBlurpMixin):
|
|
ordering = models.CharField(
|
|
_('Order'),
|
|
max_length=20,
|
|
default='alpha',
|
|
blank=False,
|
|
choices=[
|
|
('alpha', _('Default (alphabetical)')),
|
|
('popularity', _('Popularity')),
|
|
('manual', _('Manual')),
|
|
],
|
|
)
|
|
manual_order = JSONField(
|
|
blank=True,
|
|
default=dict,
|
|
verbose_name=_('Manual Order'),
|
|
help_text=_('Use drag and drop to reorder forms'),
|
|
)
|
|
limit = models.PositiveSmallIntegerField(_('Limit'), null=True, blank=True)
|
|
|
|
class Meta:
|
|
verbose_name = _('Forms of Category')
|
|
|
|
variable_name = 'forms'
|
|
default_template_name = 'combo/wcs/forms_of_category.html'
|
|
cache_duration = 120
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsFormsOfCategoryCellForm
|
|
|
|
return WcsFormsOfCategoryCellForm
|
|
|
|
@property
|
|
def wcs_site(self):
|
|
return self.category_reference.split(':')[0]
|
|
|
|
def get_api_url(self, context):
|
|
api_url = '/api/categories/%s/formdefs/' % self.category_reference.split(':')[1]
|
|
if self.ordering == 'popularity':
|
|
api_url += '?include-count=on'
|
|
return api_url
|
|
|
|
def is_relevant(self, context):
|
|
return bool(self.category_reference)
|
|
|
|
def check_validity(self):
|
|
if not self.category_reference:
|
|
return
|
|
|
|
wcs_key, category_slug = self.category_reference.split(':')
|
|
wcs_site = get_wcs_services().get(wcs_key)
|
|
categories_response_json = get_wcs_json(wcs_site, 'api/categories/')
|
|
|
|
if not categories_response_json or categories_response_json.get('err') == 1:
|
|
# can not retrieve data, don't report cell as invalid
|
|
return
|
|
|
|
category_found = any(
|
|
[c.get('slug') == category_slug for c in categories_response_json.get('data') or []]
|
|
)
|
|
if not category_found:
|
|
self.mark_as_invalid('wcs_category_not_found')
|
|
return
|
|
|
|
self.mark_as_valid()
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
extra_context.update(WcsBlurpMixin.get_cell_extra_context(self, context))
|
|
extra_context['request_is_a_bot'] = is_a_bot(context.get('request'))
|
|
if not self.category_reference:
|
|
return extra_context
|
|
extra_context['slug'] = self.category_reference.split(':')[-1]
|
|
extra_context['title'] = self.cached_title
|
|
extra_context['description'] = self.cached_description
|
|
try:
|
|
extra_context['forms'] = list(extra_context['forms'][self.wcs_site]['data'])
|
|
except (KeyError, TypeError):
|
|
# an error occured when getting the data
|
|
extra_context['forms'] = []
|
|
|
|
# default sort is alphabetical, it's always done as this will serve as
|
|
# secondary sort key (thanks to Python stable sort)
|
|
extra_context['forms'] = sorted(extra_context['forms'], key=lambda x: x.get('title'))
|
|
extra_context['more_forms'] = []
|
|
|
|
if self.ordering == 'popularity':
|
|
extra_context['forms'] = sorted(
|
|
extra_context['forms'], key=lambda x: x.get('count'), reverse=True
|
|
)
|
|
elif self.ordering == 'manual':
|
|
if self.manual_order:
|
|
manual_order = self.manual_order.get('data')
|
|
for form in extra_context['forms']:
|
|
form_reference = '%s:%s' % (self.category_reference, form['slug'])
|
|
try:
|
|
form['order'] = manual_order.index(form_reference)
|
|
except ValueError:
|
|
form['order'] = 9999
|
|
extra_context['forms'] = sorted(extra_context['forms'], key=lambda x: x.get('order', 9999))
|
|
|
|
for formdef in extra_context['forms']:
|
|
formdef['css_classes'] = get_formdef_css_classes(formdef)
|
|
|
|
if self.limit:
|
|
if len(extra_context['forms']) > self.limit:
|
|
extra_context['more_forms'] = extra_context['forms'][self.limit :]
|
|
extra_context['forms'] = extra_context['forms'][: self.limit]
|
|
|
|
return extra_context
|
|
|
|
def render_for_search(self):
|
|
return ''
|
|
|
|
def get_external_links_data(self):
|
|
if not self.category_reference:
|
|
return
|
|
formdefs = self.get_data({'synchronous': True})
|
|
for site in formdefs.values():
|
|
for formdef in site.get('data', []):
|
|
text = ' '.join([formdef.get('description', '')] + formdef.get('keywords', []))
|
|
yield {'url': formdef['url'], 'title': formdef['title'], 'text': text}
|
|
|
|
|
|
@register_cell_class
|
|
class WcsCareFormsCell(CategoriesAndWcsSiteValidityMixin, CategoriesFilteringMixin, WcsDataBaseCell):
|
|
categories = JSONField(_('Categories'), blank=True, default=dict)
|
|
custom_title = models.CharField(_('Custom Title'), max_length=150, blank=True)
|
|
|
|
api_url = '/api/forms/?limit=10'
|
|
variable_name = 'care_forms'
|
|
default_template_name = 'combo/wcs/care_forms.html'
|
|
cache_duration = models.PositiveIntegerField(_('Cache duration'), default=120, help_text=_('In seconds.'))
|
|
user_dependant = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Forms to process')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsCareFormsCellForm
|
|
|
|
return WcsCareFormsCellForm
|
|
|
|
def get_manager_tabs(self):
|
|
tabs = super().get_manager_tabs()
|
|
tabs.insert(
|
|
1,
|
|
{
|
|
'slug': 'advanced',
|
|
'name': _('Advanced'),
|
|
'fields': ['cache_duration'],
|
|
},
|
|
)
|
|
return tabs
|
|
|
|
def get_cell_extra_context(self, context):
|
|
context = super().get_cell_extra_context(context)
|
|
context['is_portal_agent'] = is_portal_agent()
|
|
|
|
categories_filter = []
|
|
if self.categories:
|
|
for category in self.categories.get('data', []):
|
|
categories_filter.append(tuple(category.split(':')))
|
|
|
|
for wcs_site in context['care_forms']:
|
|
if not context['care_forms'].get(wcs_site):
|
|
continue
|
|
context['care_forms'][wcs_site]['categories'] = [v for k, v in categories_filter if k == wcs_site]
|
|
|
|
return context
|
|
|
|
def save(self, *args, **kwargs):
|
|
result = super().save(*args, **kwargs)
|
|
self.check_validity()
|
|
return result
|
|
|
|
|
|
@register_cell_class
|
|
class CategoriesCell(WcsDataBaseCell):
|
|
api_url = '/api/categories/?full=on'
|
|
variable_name = 'form_categories'
|
|
default_template_name = 'combo/wcs/form_categories.html'
|
|
cache_duration = 120
|
|
|
|
class Meta:
|
|
verbose_name = _('Form Categories')
|
|
|
|
def check_validity(self):
|
|
if self.wcs_site and self.wcs_site not in get_wcs_services():
|
|
self.mark_as_invalid('wcs_site_not_found')
|
|
return
|
|
|
|
self.mark_as_valid()
|
|
|
|
|
|
class CardMixin:
|
|
invalid_reason_codes = invalid_reason_codes
|
|
|
|
def is_relevant(self, context):
|
|
return bool(self.carddef_reference)
|
|
|
|
@property
|
|
def wcs_site(self):
|
|
return self.carddef_reference.split(':')[0]
|
|
|
|
@property
|
|
def card_slug(self):
|
|
return self.carddef_reference.split(':')[1]
|
|
|
|
@property
|
|
def card_custom_view(self):
|
|
try:
|
|
return self.carddef_reference.split(':')[2]
|
|
except IndexError:
|
|
return None
|
|
|
|
def get_additional_label(self):
|
|
return escape(self.custom_title) or self.cached_title or None
|
|
|
|
|
|
@register_cell_class
|
|
class WcsCardsCell(CardMixin, WcsBlurpMixin, CellBase):
|
|
carddef_reference = models.CharField(_('Card Model'), max_length=150)
|
|
cached_title = models.CharField(_('Title'), max_length=150)
|
|
custom_title = models.CharField(_('Custom Title'), max_length=150, blank=True)
|
|
only_for_user = models.BooleanField(_('Limit to cards linked to the logged-in user'), default=False)
|
|
without_user = models.BooleanField(_('Ignore the logged-in user'), default=False)
|
|
limit = models.PositiveSmallIntegerField(
|
|
_('Number of cards per page (default 10)'), null=True, blank=True
|
|
)
|
|
|
|
default_template_name = 'combo/wcs/cards.html'
|
|
variable_name = 'card_objects'
|
|
|
|
class Meta:
|
|
verbose_name = _('Cards')
|
|
|
|
def save(self, *args, **kwargs):
|
|
super().save(*args, **kwargs)
|
|
|
|
if 'update_fields' in kwargs:
|
|
# don't populate the cache
|
|
return
|
|
|
|
def populate_cache():
|
|
if self.carddef_reference:
|
|
parts = self.carddef_reference.split(':')
|
|
wcs_key, card_slug = parts[:2]
|
|
wcs_site = get_wcs_services().get(wcs_key)
|
|
card_models = get_wcs_json(wcs_site, 'api/cards/@list')
|
|
|
|
if not card_models or card_models.get('err') == 1:
|
|
# can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
card_found = False
|
|
for card in card_models.get('data') or []:
|
|
slug = card['slug']
|
|
|
|
if slug != card_slug:
|
|
continue
|
|
|
|
card_title = card['title']
|
|
|
|
if len(parts) > 2:
|
|
custom_view = None
|
|
|
|
for v in card.get('custom_views') or []:
|
|
if v['id'] == parts[2]:
|
|
custom_view = v
|
|
break
|
|
|
|
if custom_view is None:
|
|
continue
|
|
|
|
card_title = '%s - %s' % (card_title, custom_view['text'])
|
|
|
|
self.cached_title = card_title
|
|
self.save(update_fields=['cached_title'])
|
|
card_found = True
|
|
break
|
|
if card_found:
|
|
self.mark_as_valid()
|
|
return
|
|
else:
|
|
return self.mark_as_invalid('wcs_card_not_found')
|
|
else:
|
|
return self.mark_as_invalid('wcs_card_not_defined')
|
|
|
|
populate_cache()
|
|
|
|
def is_visible(self, request, **kwargs):
|
|
user = getattr(request, 'user', None)
|
|
if self.only_for_user and (not user or user.is_anonymous):
|
|
return False
|
|
return super().is_visible(request, **kwargs)
|
|
|
|
def get_api_url(self, context):
|
|
parts = self.carddef_reference.split(':')
|
|
url = '/api/cards/%s/list' % parts[1]
|
|
if len(parts) > 2:
|
|
url = '%s/%s' % (url, parts[2])
|
|
user = self.get_concerned_user(context)
|
|
if self.only_for_user and user and not user.is_anonymous and user.get_name_id():
|
|
url = '%s?filter-user-uuid=%s' % (url, user.get_name_id())
|
|
return url
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
extra_context['paginate_by'] = self.limit or 10
|
|
extra_context['title'] = self.custom_title or self.cached_title
|
|
|
|
pages_with_sub_slug = Page.objects.exclude(sub_slug='')
|
|
card_id = '%s_id' % self.carddef_reference.split(':')[1]
|
|
matching_pages = [
|
|
p for p in pages_with_sub_slug if '<%s>' % card_id in p.sub_slug or p.sub_slug == card_id
|
|
]
|
|
if matching_pages:
|
|
card_page = matching_pages[0]
|
|
extra_context['card_page_base_url'] = card_page.get_online_url()
|
|
|
|
try:
|
|
extra_context['card_objects'] = list(extra_context['card_objects'][self.wcs_site]['data'])
|
|
except (KeyError, TypeError):
|
|
# an error occured when getting the data
|
|
extra_context['card_objects'] = []
|
|
|
|
return extra_context
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsCardsCellForm
|
|
|
|
return WcsCardsCellForm
|
|
|
|
def render_for_search(self):
|
|
return ''
|
|
|
|
|
|
@register_cell_class
|
|
class WcsCardInfosCell(CardMixin, CellBase):
|
|
carddef_reference = models.CharField(_('Card Model'), max_length=150)
|
|
related_card_path = models.CharField(_('Card Identifier'), max_length=1000, blank=True)
|
|
card_ids = models.CharField(_('Other Card Identifiers'), max_length=1000, blank=True)
|
|
only_for_user = models.BooleanField(_('Limit to cards linked to the logged-in user'), default=False)
|
|
without_user = models.BooleanField(_('Ignore the logged-in user'), default=False)
|
|
limit = models.PositiveSmallIntegerField(
|
|
_('Number of cards per page (default 10)'), null=True, blank=True
|
|
)
|
|
custom_schema = JSONField(blank=True, default=dict)
|
|
display_mode = models.CharField(
|
|
_('Display mode'),
|
|
max_length=10,
|
|
default='card',
|
|
choices=[
|
|
('card', _('Card')),
|
|
('table', _('Table')),
|
|
],
|
|
)
|
|
|
|
title_type = models.CharField(
|
|
_('Title'),
|
|
max_length=20,
|
|
default='auto',
|
|
blank=False,
|
|
choices=[
|
|
('auto', _('Default Title (Card Label)')),
|
|
('manual', _('Custom Title')),
|
|
('empty', _('No Title')),
|
|
],
|
|
)
|
|
custom_title = models.CharField(_('Custom Title'), max_length=150, blank=True)
|
|
|
|
cached_title = models.CharField(_('Title'), max_length=150)
|
|
cached_json = JSONField(blank=True, default=dict)
|
|
|
|
is_enabled = classmethod(is_wcs_enabled)
|
|
|
|
manager_appearance_template = 'combo/wcs/manager/card-infos-cell-form-appearance.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Card Information Cell')
|
|
|
|
def save(self, *args, **kwargs):
|
|
super().save(*args, **kwargs)
|
|
|
|
if 'update_fields' in kwargs:
|
|
# don't populate the cache
|
|
return
|
|
|
|
def populate_cache():
|
|
if self.carddef_reference:
|
|
parts = self.carddef_reference.split(':')
|
|
wcs_key, card_slug = parts[:2]
|
|
wcs_site = get_wcs_services().get(wcs_key)
|
|
card_schema = get_wcs_json(wcs_site, 'api/cards/%s/@schema' % card_slug, log_errors='warn')
|
|
|
|
if not card_schema:
|
|
# can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
if card_schema.get('err') == 1:
|
|
if card_schema.get('err_class') == 'Page not found':
|
|
self.mark_as_invalid('wcs_card_not_found')
|
|
else:
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
self.cached_title = card_schema['name']
|
|
self.cached_json = card_schema
|
|
self.save(update_fields=['cached_title', 'cached_json'])
|
|
self.mark_as_valid()
|
|
else:
|
|
self.mark_as_invalid('wcs_card_not_defined')
|
|
|
|
populate_cache()
|
|
|
|
def is_visible(self, request, **kwargs):
|
|
user = getattr(request, 'user', None)
|
|
if self.only_for_user and (not user or user.is_anonymous):
|
|
return False
|
|
return super().is_visible(request, **kwargs)
|
|
|
|
def check_validity(self):
|
|
if self.get_related_card_path():
|
|
relations = [r[0] for r in self.get_related_card_paths()]
|
|
if self.get_related_card_path() not in relations:
|
|
self.mark_as_invalid('wcs_card_relation_not_found')
|
|
return
|
|
|
|
self.mark_as_valid()
|
|
|
|
@property
|
|
def default_template_name(self):
|
|
if self.display_mode == 'table':
|
|
return 'combo/wcs/cards.html'
|
|
return 'combo/wcs/card.html'
|
|
|
|
@property
|
|
def global_context_key(self):
|
|
return '%s-card-ids' % self.get_reference()
|
|
|
|
def modify_global_context(self, context, request):
|
|
if self.display_mode == 'table' and not context.get('synchronous'):
|
|
# don't call wcs on page loading
|
|
return
|
|
if self.carddef_reference and self.global_context_key not in context:
|
|
card_ids = self.get_card_ids(context, request)
|
|
context[self.global_context_key] = card_ids
|
|
|
|
def get_repeat_template(self, context):
|
|
if self.display_mode == 'table':
|
|
# don't repeat cell if table display mode
|
|
return []
|
|
return len(context.get(self.global_context_key) or [])
|
|
|
|
def get_related_card_path(self):
|
|
if self.related_card_path == '__all__':
|
|
return ''
|
|
return self.related_card_path
|
|
|
|
def must_get_all(self):
|
|
if self.related_card_path == '__all__':
|
|
return True
|
|
return False
|
|
|
|
def get_cards_from_ids(self, card_ids, context, synchronous=False):
|
|
# if check_user, get all cards from ids in context, with correct filter-user-uuid and without_user value
|
|
# use custom view if requested
|
|
api_url = '/api/cards/%s/list?full=on' % (self.card_slug)
|
|
if self.card_custom_view:
|
|
api_url = '/api/cards/%s/list/%s?full=on' % (
|
|
self.card_slug,
|
|
self.card_custom_view,
|
|
)
|
|
user = self.get_concerned_user(context)
|
|
if self.only_for_user and user and not user.is_anonymous and user.get_name_id():
|
|
api_url += '&filter-user-uuid=%s' % user.get_name_id()
|
|
if not self.must_get_all():
|
|
api_url += '&%s' % '&'.join(['filter-internal-id=%s' % cid for cid in card_ids])
|
|
if not synchronous:
|
|
synchronous = bool(context.get('synchronous'))
|
|
wcs_site = get_wcs_services().get(self.wcs_site)
|
|
try:
|
|
response = requests.get(
|
|
api_url,
|
|
remote_service=wcs_site,
|
|
user=None if self.without_user else getattr(context.get('request'), 'user', None),
|
|
without_user=self.without_user,
|
|
cache_duration=5,
|
|
raise_if_not_cached=not synchronous,
|
|
log_errors=False,
|
|
)
|
|
response.raise_for_status()
|
|
except RequestException:
|
|
if self.card_custom_view:
|
|
# if there's a custom view consider the error is a 404 because
|
|
# the card was not found in that view, and mark it so.
|
|
context['card_not_found'] = True
|
|
return []
|
|
|
|
if response.status_code == 200:
|
|
return response.json().get('data') or []
|
|
return []
|
|
|
|
def filter_card_ids(self, card_ids, context):
|
|
# check that all ids in context are available for user
|
|
cards = self.get_cards_from_ids(card_ids, context, synchronous=True)
|
|
return [c['id'] for c in cards if str(c['id']) in [str(i) for i in card_ids]]
|
|
|
|
def get_card_data_from_ids(self, card_id, context):
|
|
# get the correct card from all known cards for ids in context
|
|
card_ids = context.get(self.global_context_key)
|
|
if not card_ids:
|
|
return None
|
|
if len(card_ids) == 1:
|
|
# if only one id, do not use the list endpoint:
|
|
# may be url can be cached and reused by cells with related
|
|
return self.get_card_data(card_id=card_id, context=context)
|
|
cards = self.get_cards_from_ids(card_ids, context)
|
|
for card_data in cards:
|
|
if str(card_data.get('id')) == str(card_id):
|
|
return card_data
|
|
return None
|
|
|
|
def get_card_data(
|
|
self,
|
|
card_id,
|
|
context,
|
|
card_slug=Ellipsis,
|
|
card_custom_view=Ellipsis,
|
|
only_for_user=Ellipsis,
|
|
without_user=Ellipsis,
|
|
synchronous=False,
|
|
):
|
|
if card_slug is Ellipsis:
|
|
card_slug = self.card_slug
|
|
if card_custom_view is Ellipsis:
|
|
card_custom_view = self.card_custom_view
|
|
if only_for_user is Ellipsis:
|
|
only_for_user = self.only_for_user
|
|
if without_user is Ellipsis:
|
|
without_user = self.without_user
|
|
api_url = '/api/cards/%s/%s/?include-files-content=off' % (card_slug, card_id)
|
|
if card_custom_view:
|
|
api_url = '/api/cards/%s/%s/%s/?include-files-content=off' % (
|
|
card_slug,
|
|
card_custom_view,
|
|
card_id,
|
|
)
|
|
user = self.get_concerned_user(context)
|
|
if only_for_user and user and not user.is_anonymous and user.get_name_id():
|
|
api_url += '&filter-user-uuid=%s' % user.get_name_id()
|
|
if not synchronous:
|
|
synchronous = bool(context.get('synchronous'))
|
|
wcs_site = get_wcs_services().get(self.wcs_site)
|
|
try:
|
|
response = requests.get(
|
|
api_url,
|
|
remote_service=wcs_site,
|
|
user=None if without_user else getattr(context.get('request'), 'user', None),
|
|
without_user=without_user,
|
|
cache_duration=5,
|
|
raise_if_not_cached=not synchronous,
|
|
log_errors=False,
|
|
)
|
|
response.raise_for_status()
|
|
except RequestException:
|
|
if card_custom_view:
|
|
# if there's a custom view consider the error is a 404 because
|
|
# the card was not found in that view, and mark it so.
|
|
context['card_not_found'] = True
|
|
return {}
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
return {}
|
|
|
|
def get_card_schema(self, card_slug):
|
|
wcs_site = get_wcs_services().get(self.wcs_site)
|
|
card_schema = get_wcs_json(wcs_site, 'api/cards/%s/@schema' % card_slug, log_errors='warn')
|
|
if not card_schema:
|
|
return None
|
|
if card_schema.get('err') == 1:
|
|
return None
|
|
return card_schema
|
|
|
|
def get_related_card_paths(self):
|
|
if not self.carddef_reference:
|
|
return []
|
|
|
|
def iter_relations(relations, path, label, carddefs_already_seen):
|
|
carddefs_already_seen = carddefs_already_seen[:]
|
|
for relation in relations:
|
|
new_path = '%s/%s%s' % (
|
|
path,
|
|
'reverse:' if relation['reverse'] else '',
|
|
relation['varname'],
|
|
)
|
|
new_label = '%s/%s%s' % (
|
|
label,
|
|
relation['varname'],
|
|
' (reverse)' if relation['reverse'] else '',
|
|
)
|
|
if relation['obj'] == 'carddef:%s' % self.card_slug:
|
|
# target carddef found
|
|
yield (new_path, new_label)
|
|
if not relation['reverse'] and relation['type'] in ['item', 'computed']:
|
|
# relation is not multiple, continue to search for matching relations
|
|
new_card_slug = relation['obj'][8:] # remove 'carddef:'
|
|
new_card_schema = self.get_card_schema(new_card_slug)
|
|
if new_card_slug not in carddefs_already_seen and new_card_schema:
|
|
carddefs_already_seen.append(new_card_slug)
|
|
yield from iter_relations(
|
|
relations=new_card_schema['relations'],
|
|
path=new_path,
|
|
label=new_label,
|
|
carddefs_already_seen=carddefs_already_seen,
|
|
)
|
|
|
|
# get cells with explicit ids
|
|
results = []
|
|
for cell in WcsCardInfosCell.objects.filter(page=self.page_id).exclude(pk=self.pk):
|
|
if not cell.slug:
|
|
# no slug
|
|
continue
|
|
if cell.related_card_path:
|
|
# no explicit ids
|
|
continue
|
|
if ',' in cell.card_ids:
|
|
# multiple ids, can not follow relations
|
|
continue
|
|
# follow relations
|
|
results += list(
|
|
iter_relations(
|
|
relations=cell.cached_json.get('relations') or [],
|
|
path=cell.slug,
|
|
label=cell.slug,
|
|
carddefs_already_seen=[self.card_slug],
|
|
)
|
|
)
|
|
return results
|
|
|
|
def get_card_ids_from_related(self, context, request):
|
|
def get_relation(relations, varname, reverse):
|
|
for relation in relations:
|
|
if relation['reverse'] == reverse and relation['varname'] == varname:
|
|
if reverse and relation['obj'][8:] != self.card_slug:
|
|
# if reverse, it's the last part; check it's the correct card model
|
|
continue
|
|
return relation
|
|
|
|
def follow_data(card_data, relations, varname, parts):
|
|
reverse = varname.startswith('reverse:')
|
|
if reverse:
|
|
varname = varname[8:] # remove 'reverse:'
|
|
relation = get_relation(relations, varname, reverse)
|
|
if not relation:
|
|
# not found - stop
|
|
return []
|
|
card_slug = relation['obj'][8:] # remove 'carddef:'
|
|
|
|
if not parts and card_slug != self.card_slug:
|
|
# last part, but wrong card model
|
|
return []
|
|
|
|
if reverse:
|
|
# reverse relation: always multiple
|
|
if not parts:
|
|
# last part - get ids and stop
|
|
ids = self.get_card_ids_from_template(
|
|
'{{ cards|objects:"%s"|filter_by:"%s"|filter_value:"%s"|getlist:"id"|join:"," }}'
|
|
% (card_slug, varname, card_data['id']),
|
|
context,
|
|
request,
|
|
)
|
|
return ids
|
|
# multiple relation, but still parts to follow - stop
|
|
return []
|
|
|
|
# direct relation
|
|
|
|
if not parts:
|
|
# last part - stop
|
|
raw_value = None
|
|
if '%s_raw' % varname in card_data['fields']:
|
|
raw_value = card_data['fields']['%s_raw' % varname]
|
|
else:
|
|
# may be a fields block ?
|
|
varname_parts = varname.split('_')
|
|
for i in range(len(varname_parts)):
|
|
block_varname = '%s_raw' % '_'.join(varname_parts[: i + 1])
|
|
if block_varname in card_data['fields']:
|
|
block_data = card_data['fields'][block_varname]
|
|
if not block_data:
|
|
continue
|
|
field_varname = '%s_raw' % '_'.join(varname_parts[i + 1 :])
|
|
values = [v.get(field_varname) for v in block_data]
|
|
values = [v for v in values if v]
|
|
if values:
|
|
return values
|
|
|
|
if not raw_value:
|
|
return []
|
|
if not isinstance(raw_value, list):
|
|
# item or computed
|
|
return [raw_value]
|
|
# items
|
|
return raw_value
|
|
|
|
if relation['type'] == 'items':
|
|
# multiple relation, but still parts to follow - stop
|
|
return []
|
|
|
|
# single relation, get card_data and follow next part
|
|
|
|
if not card_data['fields'].get('%s_raw' % varname):
|
|
# field not found or empty
|
|
return []
|
|
|
|
# get schema
|
|
card_schema = self.get_card_schema(card_slug)
|
|
if not card_schema:
|
|
# card schema not found
|
|
return []
|
|
# and data
|
|
next_card_data = self.get_card_data(
|
|
card_id=card_data['fields']['%s_raw' % varname],
|
|
context=context,
|
|
card_slug=card_slug,
|
|
card_custom_view=None,
|
|
only_for_user=False,
|
|
without_user=False,
|
|
synchronous=True,
|
|
)
|
|
if not next_card_data:
|
|
# card data not found
|
|
return []
|
|
# continue
|
|
return follow_data(
|
|
card_data=next_card_data,
|
|
relations=card_schema['relations'],
|
|
varname=parts[0],
|
|
parts=parts[1:],
|
|
)
|
|
|
|
first_cell_slug = self.get_related_card_path().split('/', maxsplit=1)[0]
|
|
try:
|
|
first_cell = WcsCardInfosCell.objects.get(page=self.page_id, slug=first_cell_slug)
|
|
except (WcsCardInfosCell.DoesNotExist, WcsCardInfosCell.MultipleObjectsReturned):
|
|
return []
|
|
if first_cell.related_card_path:
|
|
# no explicit ids
|
|
return []
|
|
if ',' in first_cell.card_ids:
|
|
# multiple ids, can not follow relations
|
|
return []
|
|
first_cell.repeat_index = 0
|
|
card_id = first_cell.get_card_id(context)
|
|
if not card_id:
|
|
# no card id found
|
|
return []
|
|
card_data = self.get_card_data(
|
|
card_id=card_id,
|
|
context=context,
|
|
card_slug=first_cell.card_slug,
|
|
card_custom_view=first_cell.card_custom_view,
|
|
only_for_user=first_cell.only_for_user,
|
|
without_user=first_cell.without_user,
|
|
synchronous=True,
|
|
)
|
|
if not card_data:
|
|
# card data not found
|
|
return []
|
|
parts = self.get_related_card_path().split('/')[1:]
|
|
return follow_data(
|
|
card_data=card_data,
|
|
relations=first_cell.cached_json['relations'],
|
|
varname=parts[0],
|
|
parts=parts[1:],
|
|
)
|
|
|
|
def get_card_ids_from_template(self, template, original_context, request):
|
|
try:
|
|
context = RequestContext(request)
|
|
context.push(original_context)
|
|
ids = Template(template).render(context).split(',')
|
|
return [i.strip() for i in ids if i.strip()]
|
|
except (VariableDoesNotExist, TemplateSyntaxError):
|
|
return []
|
|
|
|
def get_card_ids(self, original_context, request):
|
|
if not self.carddef_reference:
|
|
# not configured
|
|
return []
|
|
|
|
if self.get_related_card_path():
|
|
# look at other cells to get related ids
|
|
card_ids = self.get_card_ids_from_related(original_context, request)
|
|
if card_ids == []:
|
|
return []
|
|
elif card_ids:
|
|
# check that ids from related are available for user
|
|
# (use only_for_user and without_user flags)
|
|
if len(card_ids) == 1:
|
|
# if only one id, do not use the list endpoint:
|
|
# may be url can be cached and reused by cells with related
|
|
return (
|
|
card_ids
|
|
if self.get_card_data(card_id=card_ids[0], context=original_context, synchronous=True)
|
|
else []
|
|
)
|
|
return self.filter_card_ids(card_ids, original_context)
|
|
|
|
if self.must_get_all():
|
|
if self.display_mode == 'table':
|
|
# don't call wcs if table mode with all cards
|
|
return []
|
|
# get all cards
|
|
return [c['id'] for c in self.get_cards_from_ids([], original_context, synchronous=True)]
|
|
|
|
if self.card_ids:
|
|
# card ids template is defined
|
|
return self.get_card_ids_from_template(self.card_ids, original_context, request)
|
|
|
|
# get card id from page's sub slug (check in context)
|
|
card_id = '%s_id' % self.card_slug
|
|
if original_context.get(card_id):
|
|
return [original_context[card_id]]
|
|
|
|
# nothing found in context
|
|
return []
|
|
|
|
def get_card_id(self, context):
|
|
repeat_index = getattr(self, 'repeat_index', context.get('repeat_index'))
|
|
if repeat_index is not None:
|
|
try:
|
|
return context.get(self.global_context_key)[repeat_index]
|
|
except (IndexError, TypeError):
|
|
return None
|
|
|
|
def get_extra_manager_context(self):
|
|
extra_context = super().get_extra_manager_context()
|
|
if self.cached_json:
|
|
extra_context['card_schema'] = self.cached_json
|
|
extra_context['card_schema_id'] = 'cell-%s-card-schema-%s' % (self.pk, self.carddef_reference)
|
|
return extra_context
|
|
|
|
@property
|
|
def css_class_names(self):
|
|
return '%s card %s' % (super().css_class_names, self.get_reference())
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
if self.title_type in ['auto', 'manual']:
|
|
# card mode: default value used if card is not found
|
|
extra_context['title'] = self.cached_title
|
|
return getattr(self, 'get_cell_extra_context_%s_mode' % self.display_mode)(context, extra_context)
|
|
|
|
def get_cell_extra_context_table_mode(self, context, extra_context):
|
|
if not context.get('synchronous'):
|
|
raise NothingInCacheException()
|
|
|
|
extra_context['paginate_by'] = self.limit or 10
|
|
if self.title_type == 'manual':
|
|
extra_context['title'] = self.custom_title or extra_context['title']
|
|
if not self.carddef_reference:
|
|
# not configured
|
|
return extra_context
|
|
|
|
pages_with_sub_slug = Page.objects.exclude(sub_slug='')
|
|
card_id = '%s_id' % self.carddef_reference.split(':')[1]
|
|
matching_pages = [
|
|
p for p in pages_with_sub_slug if '<%s>' % card_id in p.sub_slug or p.sub_slug == card_id
|
|
]
|
|
if matching_pages:
|
|
card_page = matching_pages[0]
|
|
extra_context['card_page_base_url'] = card_page.get_online_url()
|
|
|
|
card_ids = context.get(self.global_context_key)
|
|
if not card_ids and self.related_card_path != '__all__':
|
|
extra_context['card_objects'] = []
|
|
else:
|
|
extra_context['card_objects'] = self.get_cards_from_ids(card_ids, context)
|
|
|
|
return extra_context
|
|
|
|
def get_cell_extra_context_card_mode(self, context, extra_context):
|
|
extra_context['schema'] = self.cached_json
|
|
extra_context['fields_by_varnames'] = {
|
|
i['varname']: i for i in (self.cached_json.get('fields') or []) if i.get('varname')
|
|
}
|
|
|
|
card_id = self.get_card_id(context)
|
|
if not card_id:
|
|
if self.card_ids or self.related_card_path:
|
|
# all cards, template or related_card_path defined, but result is empty or None
|
|
extra_context['card_not_found'] = True
|
|
return extra_context
|
|
|
|
card_data = self.get_card_data_from_ids(card_id, context)
|
|
if not card_data:
|
|
return extra_context
|
|
|
|
extra_context['card'] = card_data
|
|
custom_context = Context(extra_context, autoescape=False)
|
|
custom_context.update(context)
|
|
repeat_index = getattr(self, 'repeat_index', context.get('repeat_index')) or 0
|
|
custom_context['repeat_index'] = repeat_index
|
|
if self.title_type == 'manual':
|
|
try:
|
|
extra_context['title'] = Template(self.custom_title).render(custom_context)
|
|
except (VariableDoesNotExist, TemplateSyntaxError):
|
|
extra_context['title'] = ''
|
|
# auto title or custom_title gives an empty title, use default value + card text
|
|
if self.title_type == 'auto' or self.title_type == 'manual' and not extra_context['title']:
|
|
extra_context['title'] = '%s - %s' % (self.cached_title, extra_context['card'].get('text'))
|
|
|
|
def render_template(item, template_key, template_context, target_key, target_context):
|
|
if not item.get(template_key):
|
|
return
|
|
try:
|
|
target_context['card'][target_key][item[template_key]] = Template(item[template_key]).render(
|
|
template_context
|
|
)
|
|
except (VariableDoesNotExist, TemplateSyntaxError):
|
|
return
|
|
|
|
extra_context['card']['custom_fields'] = {}
|
|
extra_context['card']['urls'] = {}
|
|
if self.custom_schema:
|
|
for item in self.custom_schema.get('cells') or []:
|
|
if item.get('varname') not in ['@custom@', '@link@']:
|
|
continue
|
|
if not item.get('template'):
|
|
continue
|
|
render_template(
|
|
item=item,
|
|
template_key='template',
|
|
template_context=custom_context,
|
|
target_key='custom_fields',
|
|
target_context=extra_context,
|
|
)
|
|
|
|
if item['varname'] == '@link@':
|
|
render_template(
|
|
item=item,
|
|
template_key='url_template',
|
|
template_context=custom_context,
|
|
target_key='urls',
|
|
target_context=extra_context,
|
|
)
|
|
|
|
return extra_context
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import WcsCardInfoCellForm
|
|
|
|
return WcsCardInfoCellForm
|
|
|
|
def get_appearance_fields(self):
|
|
return ['title_type', 'custom_title']
|
|
|
|
def get_appearance_form_class(self):
|
|
from .forms import WcsCardInfoCellAppearanceBaseForm
|
|
|
|
return super().get_appearance_form_class(base_options_form_class=WcsCardInfoCellAppearanceBaseForm)
|
|
|
|
def get_manager_tabs(self):
|
|
from .forms import WcsCardInfoCellDisplayForm
|
|
|
|
tabs = super().get_manager_tabs()
|
|
tabs.append(
|
|
{
|
|
'slug': 'display',
|
|
'name': _('Display'),
|
|
'template': 'combo/wcs/manager/card-infos-cell-form-display.html',
|
|
'form': WcsCardInfoCellDisplayForm,
|
|
},
|
|
)
|
|
return tabs
|
|
|
|
def get_custom_schema(self):
|
|
custom_schema = self.custom_schema or {}
|
|
|
|
# migrate old formats
|
|
for cell in custom_schema.get('cells') or []:
|
|
if cell.get('varname') == '@custom@':
|
|
if cell.get('display_mode') == 'value':
|
|
cell['display_mode'] = 'text'
|
|
else:
|
|
if not cell.get('field_content'):
|
|
if cell.get('display_mode') == 'title':
|
|
cell['field_content'] = 'value'
|
|
else:
|
|
cell['field_content'] = cell.get('display_mode') or 'value'
|
|
if cell.get('display_mode') in ['label', 'value', 'label-and-value']:
|
|
cell['display_mode'] = 'text'
|
|
if not cell.get('empty_value'):
|
|
cell['empty_value'] = '@empty@'
|
|
|
|
return custom_schema
|
|
|
|
|
|
@register_cell_class
|
|
class TrackingCodeInputCell(CellBase):
|
|
is_enabled = classmethod(is_wcs_enabled)
|
|
wcs_site = models.CharField(_('Site'), max_length=50, blank=True)
|
|
default_template_name = 'combo/wcs/tracking_code_input.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Tracking Code Input')
|
|
|
|
def check_validity(self):
|
|
if self.wcs_site and self.wcs_site not in get_wcs_services():
|
|
self.mark_as_invalid('wcs_site_not_found')
|
|
return
|
|
|
|
self.mark_as_valid()
|
|
|
|
def get_default_form_class(self):
|
|
if len(get_wcs_services()) == 1:
|
|
return None
|
|
combo_wcs_sites = [('', _('All Sites'))] + [
|
|
(x, y.get('title')) for x, y in get_wcs_services().items()
|
|
]
|
|
return model_forms.modelform_factory(
|
|
self.__class__, fields=['wcs_site'], widgets={'wcs_site': Select(choices=combo_wcs_sites)}
|
|
)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
if not self.wcs_site:
|
|
self.wcs_site = list(get_wcs_services().keys())[0]
|
|
extra_context['url'] = (get_wcs_services().get(self.wcs_site) or {}).get('url')
|
|
return extra_context
|
|
|
|
|
|
@register_cell_class
|
|
class BackofficeSubmissionCell(CategoriesAndWcsSiteValidityMixin, CategoriesFilteringMixin, WcsDataBaseCell):
|
|
api_url = '/api/formdefs/?backoffice-submission=on'
|
|
variable_name = 'all_formdefs'
|
|
default_template_name = 'combo/wcs/backoffice_submission.html'
|
|
cache_duration = 120
|
|
user_dependant = True
|
|
|
|
categories = JSONField(_('Categories'), blank=True, default=dict)
|
|
|
|
class Meta:
|
|
verbose_name = _('Backoffice Submission')
|
|
|
|
class Media:
|
|
js = (
|
|
'/jsi18n',
|
|
'js/combo.submission.js',
|
|
)
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import BackofficeSubmissionCellForm
|
|
|
|
return BackofficeSubmissionCellForm
|
|
|
|
def get_cell_extra_context(self, context):
|
|
context = super().get_cell_extra_context(context)
|
|
all_formdefs = context.pop('all_formdefs')
|
|
formdefs = {}
|
|
|
|
# add a fake category where it's missing
|
|
for key, site_formdefs in all_formdefs.items():
|
|
if not site_formdefs.get('data'):
|
|
continue
|
|
formdefs[key] = site_formdefs
|
|
for formdef in site_formdefs['data']:
|
|
if 'category' not in formdef:
|
|
formdef['category'] = _('Misc')
|
|
context['all_formdefs'] = formdefs
|
|
return context
|