combo/combo/apps/wcs/models.py

1069 lines
37 KiB
Python

# -*- coding: utf-8 -*-
#
# combo - content management system
# Copyright (C) 2014-2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import copy
import logging
from django.conf import settings
from django.contrib.postgres.fields import JSONField
from django.db import models
from django.forms import Select
from django.forms import models as model_forms
from django.utils.safestring import mark_safe
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
from requests.exceptions import RequestException
from combo import utils
from combo.data.library import register_cell_class
from combo.data.models import CellBase, Page
from combo.utils import requests
from .utils import get_wcs_json, get_wcs_services, is_wcs_enabled
invalid_reason_codes = {
'wcs_form_not_defined': _('No form set'),
'wcs_form_not_found': _('Invalid form'),
'wcs_card_not_defined': _('No card model set'),
'wcs_card_not_found': _('Invalid card model'),
'wcs_category_not_defined': _('No category set'),
'wcs_category_not_found': _('Invalid category'),
'wcs_data_failure': _('Failed to get data'),
}
def get_formdef_css_classes(formdef):
classes = []
if formdef.get('redirection'):
classes.append('is-redirection')
if formdef.get('authentication_required'):
classes.append('required-authentication')
for authn_context in formdef.get('required_authentication_contexts') or []:
classes.append('required-%s-authentication' % authn_context)
for keyword in formdef.get('keywords') or []:
classes.append('keyword-%s' % slugify(keyword))
return classes
@register_cell_class
class WcsFormCell(CellBase):
formdef_reference = models.CharField(_('Form'), max_length=150)
cached_title = models.CharField(_('Title'), max_length=150)
cached_url = models.URLField(_('URL'))
cached_json = JSONField(blank=True, default=dict)
template_name = 'combo/wcs/form.html'
add_as_link_label = _('add a form link')
add_link_label = _('New form link')
edit_link_label = _('Edit form link')
add_as_link_code = 'form-link'
invalid_reason_codes = invalid_reason_codes
is_enabled = classmethod(is_wcs_enabled)
class Meta:
verbose_name = _('Form Link')
def get_default_form_class(self):
from .forms import WcsFormCellForm
return WcsFormCellForm
def save(self, *args, **kwargs):
if 'update_fields' in kwargs:
# don't populate the cache
return super().save(*args, **kwargs)
def populate_cache():
if self.formdef_reference:
wcs_key, form_slug = self.formdef_reference.split(':')
wcs_site = get_wcs_services().get(wcs_key)
forms_response_json = get_wcs_json(wcs_site, 'api/formdefs/')
if not forms_response_json or forms_response_json.get('err') == 1:
# can not retrieve data, don't report cell as invalid
self.mark_as_valid()
return
form_found = False
for form in forms_response_json.get('data') or []:
slug = form.get('slug')
if slug == form_slug:
self.cached_title = form.get('title')
self.cached_url = form.get('url')
self.cached_json = form
self.save(update_fields=['cached_title', 'cached_url', 'cached_json'])
form_found = True
break
if form_found:
self.mark_as_valid()
return
else:
return self.mark_as_invalid('wcs_form_not_found')
else:
return self.mark_as_invalid('wcs_form_not_defined')
super().save(*args, **kwargs)
populate_cache()
def get_cell_extra_context(self, context):
context = super(WcsFormCell, self).get_cell_extra_context(context)
context['slug'] = self.formdef_reference.split(':')[-1]
context['title'] = self.cached_title
context['url'] = self.cached_url + 'tryauth'
if self.cached_json:
context['description'] = mark_safe(self.cached_json.get('description', ''))
context['css_classes'] = get_formdef_css_classes(self.cached_json)
for attribute in self.cached_json:
if not attribute in context:
context[attribute] = self.cached_json.get(attribute)
return context
def get_additional_label(self):
if not self.cached_title:
return
return self.cached_title
def render_for_search(self):
return ''
def get_external_links_data(self):
if not (self.cached_url and self.cached_title):
return []
text = ''
if self.cached_json:
text = ' '.join(
[self.cached_json.get('description', ''), ' '.join(self.cached_json.get('keywords', []))]
).strip()
return [
{
'url': self.cached_url,
'title': self.cached_title,
'text': text,
}
]
def get_slug_for_asset(self):
return self.formdef_reference
def get_label_for_asset(self):
return str(self)
def get_asset_slot_key(self, key):
# for legacy
return 'wcs:form:%s:%s' % (key, self.get_slug_for_asset())
def get_asset_slot_templates(self):
# for legacy
if settings.WCS_FORM_ASSET_SLOTS:
return settings.WCS_FORM_ASSET_SLOTS
return super().get_asset_slot_templates()
class WcsCommonCategoryCell(CellBase):
is_enabled = classmethod(is_wcs_enabled)
category_reference = models.CharField(_('Category'), max_length=150)
cached_title = models.CharField(_('Title'), max_length=150)
cached_description = models.TextField(_('Description'), blank=True)
cached_url = models.URLField(_('Cached URL'))
invalid_reason_codes = invalid_reason_codes
class Meta:
abstract = True
def save(self, *args, **kwargs):
if 'update_fields' in kwargs:
# don't populate the cache
return super().save(*args, **kwargs)
def populate_cache():
if self.category_reference:
wcs_key, category_slug = self.category_reference.split(':')
wcs_site = get_wcs_services().get(wcs_key)
categories_response_json = get_wcs_json(wcs_site, 'api/categories/')
if not categories_response_json or categories_response_json.get('err') == 1:
# can not retrieve data, don't report cell as invalid
self.mark_as_valid()
return
category_found = False
for category in categories_response_json.get('data') or []:
slug = category.get('slug')
if slug == category_slug:
self.cached_title = category.get('title')
self.cached_description = category.get('description') or ''
self.cached_url = category.get('url')
self.save(update_fields=['cached_title', 'cached_description', 'cached_url'])
category_found = True
break
if category_found:
self.mark_as_valid()
return
else:
return self.mark_as_invalid('wcs_category_not_found')
else:
return self.mark_as_invalid('wcs_category_not_defined')
super().save(*args, **kwargs)
populate_cache()
def get_additional_label(self):
if not self.cached_title:
return
return self.cached_title
def get_slug_for_asset(self):
return self.category_reference
def get_label_for_asset(self):
return str(self)
def get_asset_slot_key(self, key):
# for legacy
return 'wcs:category:%s:%s' % (key, self.get_slug_for_asset())
def get_asset_slot_templates(self):
# for legacy
if settings.WCS_CATEGORY_ASSET_SLOTS:
return settings.WCS_CATEGORY_ASSET_SLOTS
return super().get_asset_slot_templates()
@register_cell_class
class WcsCategoryCell(WcsCommonCategoryCell):
template_name = 'combo/wcs/category.html'
class Meta:
verbose_name = _('Category Link')
def get_default_form_class(self):
from .forms import WcsCategoryCellForm
return WcsCategoryCellForm
def get_cell_extra_context(self, context):
context = super(WcsCategoryCell, self).get_cell_extra_context(context)
if not self.category_reference:
return context
context['slug'] = self.category_reference.split(':')[-1]
context['title'] = self.cached_title
context['description'] = self.cached_description
context['url'] = self.cached_url
return context
class WcsBlurpMixin(object):
is_enabled = classmethod(is_wcs_enabled)
cache_duration = 5
api_url = None
warn_on_4xx = True
invalid_reason_codes = invalid_reason_codes
def get_api_url(self, context):
return self.api_url
def get_api_url_for_site(self, context, wcs_slug):
return self.get_api_url(context)
def get_data(self, context):
if context.get('placeholder_search_mode'):
# don't call webservices when we're just looking for placeholders
return {}
if self.wcs_site:
try:
wcs_sites = {self.wcs_site: get_wcs_services()[self.wcs_site]}
except KeyError:
# in case of the site disappeared from settings
return {}
else:
wcs_sites = get_wcs_services()
result = {}
returns = set([])
for wcs_slug, _wcs_site in wcs_sites.items():
api_url = self.get_api_url_for_site(context, wcs_slug)
if not api_url:
# nothing to call for this site
continue
url = _wcs_site.get('url')
if not url.endswith('/'):
url += '/'
wcs_site = copy.deepcopy(_wcs_site)
result[wcs_slug] = wcs_site
wcs_site['base_url'] = url
wcs_site['slug'] = wcs_slug
try:
response = requests.get(
api_url,
remote_service=wcs_site,
user=None
if getattr(self, 'without_user', False)
else getattr(context.get('request'), 'user', None),
without_user=getattr(self, 'without_user', False),
cache_duration=self.cache_duration,
raise_if_not_cached=not (context.get('synchronous')),
log_errors=False,
)
returns.add(response.status_code)
response.raise_for_status()
except RequestException:
continue
json_response = response.json()
if json_response.get('err', 0) == 0:
wcs_site['data'] = json_response['data']
else:
# skip errors
continue
# and mark all items with the site info
for item in wcs_site['data']:
item['site_slug'] = wcs_slug
if returns and 200 not in returns: # not a single valid answer
logging.debug('failed to get data from any %s (%r)', api_url, returns)
if all([400 <= r < 500 for r in returns]) and self.warn_on_4xx:
# only 4xx errors, not a user cell, report the cell as invalid
self.mark_as_invalid('wcs_data_failure', force=False)
else:
self.mark_as_valid()
else:
self.mark_as_valid()
return result
def get_cell_extra_context(self, context):
return {self.variable_name: self.get_data(context)}
class WcsDataBaseCell(CellBase, WcsBlurpMixin):
is_enabled = classmethod(is_wcs_enabled)
wcs_site = models.CharField(_('Site'), max_length=50, blank=True)
class Meta:
abstract = True
def get_additional_label(self):
wcs_sites = dict([(x, y.get('title')) for x, y in get_wcs_services().items()])
if len(wcs_sites.keys()) < 2:
return ''
if self.wcs_site in wcs_sites:
return wcs_sites[self.wcs_site]
return _('All Sites')
def get_form_fields(self):
if len(get_wcs_services()) == 1:
return []
return ['wcs_site']
def get_form_widgets(self):
if len(get_wcs_services()) == 1:
return {}
combo_wcs_sites = [('', _('All'))]
wcs_sites = [(x, y.get('title')) for x, y in get_wcs_services().items()]
wcs_sites.sort(key=lambda x: x[1])
combo_wcs_sites.extend(wcs_sites)
return {'wcs_site': Select(choices=combo_wcs_sites)}
def get_default_form_class(self):
return model_forms.modelform_factory(
self.__class__, fields=self.get_form_fields(), widgets=self.get_form_widgets()
)
def get_cell_extra_context(self, context):
extra_context = super(WcsDataBaseCell, self).get_cell_extra_context(context)
extra_context.update(WcsBlurpMixin.get_cell_extra_context(self, context))
return extra_context
class WcsUserDataBaseCell(WcsDataBaseCell):
warn_on_4xx = False
user_dependant = True
class Meta:
abstract = True
def is_visible(self, **kwargs):
user = kwargs.get('user')
if not user or user.is_anonymous:
return False
return super(WcsUserDataBaseCell, self).is_visible(**kwargs)
class CategoriesValidityMixin(object):
def check_validity(self):
categories = self.categories.get('data', [])
if not categories:
return
for category_reference in categories:
wcs_key, category_slug = category_reference.split(':')
wcs_site = get_wcs_services().get(wcs_key)
categories_response_json = get_wcs_json(wcs_site, 'api/categories/')
if not categories_response_json or categories_response_json.get('err') == 1:
# can not retrieve data, don't report cell as invalid
continue
category_found = any(
[c.get('slug') == category_slug for c in categories_response_json.get('data') or []]
)
if not category_found:
self.mark_as_invalid('wcs_category_not_found')
return
self.mark_as_valid()
class CategoriesFilteringMixin(object):
def get_api_url_for_site(self, context, wcs_slug):
url = self.get_api_url(context)
if self.categories and self.categories.get('data') or []:
categories_by_site = collections.defaultdict(list)
for category in self.categories['data']:
key, slug = category.split(':')
categories_by_site[key].append(slug)
if not categories_by_site.get(wcs_slug):
return None
separator = '?' if '?' not in url else '&'
url += '%scategory_slugs=%s' % (separator, ','.join(categories_by_site.get(wcs_slug)))
return url
@register_cell_class
class WcsCurrentFormsCell(CategoriesValidityMixin, CategoriesFilteringMixin, WcsUserDataBaseCell):
variable_name = 'user_forms'
loading_message = _('Loading forms...')
categories = JSONField(_('Categories'), blank=True, default=dict)
current_forms = models.BooleanField(_('Current Forms'), default=True)
done_forms = models.BooleanField(_('Done Forms'), default=False)
include_drafts = models.BooleanField(_('Include drafts'), default=False)
class Meta:
verbose_name = _('User Forms')
def get_default_form_class(self):
from .forms import WcsCurrentFormsCellForm
return WcsCurrentFormsCellForm
def get_api_url(self, context):
user = self.get_concerned_user(context)
base_url = '/api/user/forms'
if user:
user_name_id = user.get_name_id()
if user_name_id:
base_url = '/api/users/%s/forms' % user_name_id
url = base_url + '?limit=100&sort=desc'
if self.include_drafts:
url += '&include-drafts=on'
return url
@property
def template_name(self):
if self.current_forms and self.done_forms:
return 'combo/wcs/user_all_forms.html'
if self.done_forms:
return 'combo/wcs/user_done_forms.html'
return 'combo/wcs/current_forms.html'
def get_additional_label(self):
initial_label = super(WcsCurrentFormsCell, self).get_additional_label()
if self.include_drafts:
if self.current_forms and self.done_forms:
label = _('All Forms and Drafts')
elif self.done_forms:
label = _('Done Forms and Drafts')
else:
label = _('Current Forms and Drafts')
else:
if self.current_forms and self.done_forms:
label = _('All Forms')
elif self.done_forms:
label = _('Done Forms')
else:
label = _('Current Forms')
if initial_label:
return '%s - %s' % (initial_label, label)
return label
def get_cell_extra_context(self, context):
context = super(WcsCurrentFormsCell, self).get_cell_extra_context(context)
for wcs_site in context['user_forms']:
if not context['user_forms'].get(wcs_site):
continue
if not context['user_forms'][wcs_site].get('data'):
continue
forms = context['user_forms'][wcs_site]['data']
if self.current_forms and self.done_forms:
pass # keep them all
elif self.current_forms:
forms = [x for x in forms if not x.get('form_status_is_endpoint')]
elif self.done_forms:
forms = [x for x in forms if x.get('form_status_is_endpoint')]
else:
forms = [] # nothing left
context['user_forms'][wcs_site]['data'] = forms # put it back
context['current_forms'] = context['user_forms'] # legacy
# regroup all forms in a flat list
context['forms'] = []
for wcs_site in context['user_forms']:
if not context['user_forms'].get(wcs_site):
continue
if not context['user_forms'][wcs_site].get('data'):
continue
context['forms'].extend(context['user_forms'][wcs_site]['data'])
return context
@register_cell_class
class WcsCurrentDraftsCell(CategoriesValidityMixin, CategoriesFilteringMixin, WcsUserDataBaseCell):
variable_name = 'current_drafts'
template_name = 'combo/wcs/current_drafts.html'
loading_message = _('Loading drafts...')
categories = JSONField(_('Categories'), blank=True, default=dict)
class Meta:
verbose_name = _('Current Drafts')
def get_default_form_class(self):
from .forms import WcsCurrentDraftsCellForm
return WcsCurrentDraftsCellForm
def get_api_url(self, context):
user = self.get_concerned_user(context)
if user:
user_name_id = user.get_name_id()
if user_name_id:
return '/api/users/%s/drafts' % user_name_id
return '/api/user/drafts'
def get_cell_extra_context(self, context):
context = super(WcsCurrentDraftsCell, self).get_cell_extra_context(context)
for wcs_site in context['current_drafts']:
if not context['current_drafts'].get(wcs_site):
continue
if not context['current_drafts'][wcs_site].get('data'):
continue
forms = context['current_drafts'][wcs_site]['data']
context['current_drafts'][wcs_site]['data'] = forms # put it back
# regroup all forms in a flat list
context['drafts'] = []
for wcs_site in context['current_drafts']:
if not context['current_drafts'].get(wcs_site):
continue
if not context['current_drafts'][wcs_site].get('data'):
continue
context['drafts'].extend(context['current_drafts'][wcs_site]['data'])
return context
@register_cell_class
class WcsFormsOfCategoryCell(WcsCommonCategoryCell, WcsBlurpMixin):
ordering = models.CharField(
_('Order'),
max_length=20,
default='alpha',
blank=False,
choices=[
('alpha', _('Default (alphabetical)')),
('popularity', _('Popularity')),
('manual', _('Manual')),
],
)
manual_order = JSONField(
blank=True,
default=dict,
verbose_name=_('Manual Order'),
help_text=_('Use drag and drop to reorder forms'),
)
limit = models.PositiveSmallIntegerField(_('Limit'), null=True, blank=True)
class Meta:
verbose_name = _('Forms of Category')
variable_name = 'forms'
template_name = 'combo/wcs/forms_of_category.html'
cache_duration = 600
def get_default_form_class(self):
from .forms import WcsFormsOfCategoryCellForm
return WcsFormsOfCategoryCellForm
@property
def wcs_site(self):
return self.category_reference.split(':')[0]
def get_api_url(self, context):
api_url = '/api/categories/%s/formdefs/' % self.category_reference.split(':')[1]
if self.ordering == 'popularity':
api_url += '?include-count=on'
return api_url
def is_relevant(self, context):
return bool(self.category_reference)
def check_validity(self):
if not self.category_reference:
return
wcs_key, category_slug = self.category_reference.split(':')
wcs_site = get_wcs_services().get(wcs_key)
categories_response_json = get_wcs_json(wcs_site, 'api/categories/')
if not categories_response_json or categories_response_json.get('err') == 1:
# can not retrieve data, don't report cell as invalid
return
category_found = any(
[c.get('slug') == category_slug for c in categories_response_json.get('data') or []]
)
if not category_found:
self.mark_as_invalid('wcs_category_not_found')
return
self.mark_as_valid()
def get_cell_extra_context(self, context):
extra_context = super(WcsFormsOfCategoryCell, self).get_cell_extra_context(context)
extra_context.update(WcsBlurpMixin.get_cell_extra_context(self, context))
if not self.category_reference:
return extra_context
extra_context['slug'] = self.category_reference.split(':')[-1]
extra_context['title'] = self.cached_title
extra_context['description'] = self.cached_description
try:
extra_context['forms'] = list(extra_context['forms'][self.wcs_site]['data'])
except (KeyError, TypeError) as e:
# an error occured when getting the data
extra_context['forms'] = []
# default sort is alphabetical, it's always done as this will serve as
# secondary sort key (thanks to Python stable sort)
extra_context['forms'] = sorted(extra_context['forms'], key=lambda x: x.get('title'))
extra_context['more_forms'] = []
if self.ordering == 'popularity':
extra_context['forms'] = sorted(
extra_context['forms'], key=lambda x: x.get('count'), reverse=True
)
elif self.ordering == 'manual':
if self.manual_order:
manual_order = self.manual_order.get('data')
for form in extra_context['forms']:
form_reference = '%s:%s' % (self.category_reference, form['slug'])
try:
form['order'] = manual_order.index(form_reference)
except ValueError:
form['order'] = 9999
extra_context['forms'] = sorted(extra_context['forms'], key=lambda x: x.get('order', 9999))
for formdef in extra_context['forms']:
formdef['css_classes'] = get_formdef_css_classes(formdef)
if self.limit:
if len(extra_context['forms']) > self.limit:
extra_context['more_forms'] = extra_context['forms'][self.limit :]
extra_context['forms'] = extra_context['forms'][: self.limit]
return extra_context
def render_for_search(self):
return ''
def get_external_links_data(self):
if not self.category_reference:
return
formdefs = self.get_data({'synchronous': True})
for site in formdefs.values():
for formdef in site.get('data', []):
text = ' '.join([formdef.get('description', '')] + formdef.get('keywords', []))
yield {'url': formdef['url'], 'title': formdef['title'], 'text': text}
@register_cell_class
class WcsCareFormsCell(CategoriesValidityMixin, CategoriesFilteringMixin, WcsDataBaseCell):
categories = JSONField(_('Categories'), blank=True, default=dict)
api_url = '/api/forms/?limit=10'
variable_name = 'care_forms'
template_name = 'combo/wcs/care_forms.html'
cache_duration = 120
user_dependant = True
class Meta:
verbose_name = _('Forms to process')
def get_default_form_class(self):
from .forms import WcsCareFormsCellForm
return WcsCareFormsCellForm
def get_cell_extra_context(self, context):
context = super().get_cell_extra_context(context)
categories_filter = []
if self.categories:
for category in self.categories.get('data', []):
categories_filter.append(tuple(category.split(':')))
for wcs_site in context['care_forms']:
if not context['care_forms'].get(wcs_site):
continue
context['care_forms'][wcs_site]['categories'] = [v for k, v in categories_filter if k == wcs_site]
return context
@register_cell_class
class CategoriesCell(WcsDataBaseCell):
api_url = '/api/categories/?full=on'
variable_name = 'form_categories'
template_name = 'combo/wcs/form_categories.html'
cache_duration = 600
class Meta:
verbose_name = _('Form Categories')
class CardMixin(object):
invalid_reason_codes = invalid_reason_codes
def is_relevant(self, context):
return bool(self.carddef_reference)
@property
def wcs_site(self):
return self.carddef_reference.split(':')[0]
def get_additional_label(self):
if not self.cached_title:
return
return self.cached_title
@register_cell_class
class WcsCardsCell(CardMixin, WcsBlurpMixin, CellBase):
carddef_reference = models.CharField(_('Card Model'), max_length=150)
cached_title = models.CharField(_('Title'), max_length=150)
custom_title = models.CharField(_('Custom Title'), max_length=150, blank=True)
only_for_user = models.BooleanField(_('Limit to cards linked to the logged-in user'), default=False)
without_user = models.BooleanField(_('Ignore the logged-in user'), default=False)
template_name = 'combo/wcs/cards.html'
variable_name = 'cards'
class Meta:
verbose_name = _('Cards')
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if 'update_fields' in kwargs:
# don't populate the cache
return
def populate_cache():
if self.carddef_reference:
parts = self.carddef_reference.split(':')
wcs_key, card_slug = parts[:2]
wcs_site = get_wcs_services().get(wcs_key)
card_models = get_wcs_json(wcs_site, 'api/cards/@list')
if not card_models or card_models.get('err') == 1:
# can not retrieve data, don't report cell as invalid
self.mark_as_valid()
return
card_found = False
for card in card_models.get('data') or []:
slug = card['slug']
if slug != card_slug:
continue
card_title = card['title']
if len(parts) > 2:
custom_view = None
for v in card.get('custom_views') or []:
if v['id'] == parts[2]:
custom_view = v
break
if custom_view is None:
continue
card_title = '%s - %s' % (card_title, custom_view['text'])
self.cached_title = card_title
self.save(update_fields=['cached_title'])
card_found = True
break
if card_found:
self.mark_as_valid()
return
else:
return self.mark_as_invalid('wcs_card_not_found')
else:
return self.mark_as_invalid('wcs_card_not_defined')
populate_cache()
def is_visible(self, **kwargs):
user = kwargs.get('user')
if self.only_for_user and (not user or user.is_anonymous):
return False
return super().is_visible(**kwargs)
def get_api_url(self, context):
parts = self.carddef_reference.split(':')
url = '/api/cards/%s/list' % parts[1]
if len(parts) > 2:
url = '%s/%s' % (url, parts[2])
user = self.get_concerned_user(context)
if self.only_for_user and user and user.get_name_id():
url = '%s?filter-user-uuid=%s' % (url, user.get_name_id())
return url
def get_cell_extra_context(self, context):
extra_context = super().get_cell_extra_context(context)
extra_context['title'] = self.custom_title or self.cached_title
pages_with_sub_slug = Page.objects.exclude(sub_slug='')
card_id = '%s_id' % self.carddef_reference.split(':')[1]
matching_pages = [
p for p in pages_with_sub_slug if '<%s>' % card_id in p.sub_slug or p.sub_slug == card_id
]
if matching_pages:
card_page = matching_pages[0]
extra_context['card_page_base_url'] = card_page.get_online_url()
try:
extra_context['cards'] = list(extra_context['cards'][self.wcs_site]['data'])
except (KeyError, TypeError):
# an error occured when getting the data
extra_context['cards'] = []
return extra_context
def get_default_form_class(self):
from .forms import WcsCardsCellForm
return WcsCardsCellForm
def render_for_search(self):
return ''
@register_cell_class
class WcsCardInfosCell(CardMixin, CellBase):
carddef_reference = models.CharField(_('Card Model'), max_length=150)
card_id = models.CharField(_('Card Identifier'), max_length=150, blank=True)
without_user = models.BooleanField(_('Ignore the logged-in user'), default=False)
cached_title = models.CharField(_('Title'), max_length=150)
cached_json = JSONField(blank=True, default=dict)
is_enabled = classmethod(is_wcs_enabled)
template_name = 'combo/wcs/card.html'
manager_form_template = 'combo/wcs/manager/card-infos-cell-form.html'
class Meta:
verbose_name = _('Card Information Cell')
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if 'update_fields' in kwargs:
# don't populate the cache
return
def populate_cache():
if self.carddef_reference:
wcs_key, card_slug = self.carddef_reference.split(':')
wcs_site = get_wcs_services().get(wcs_key)
card_schema = get_wcs_json(wcs_site, 'api/cards/%s/@schema' % card_slug, log_errors='warn')
if not card_schema:
# can not retrieve data, don't report cell as invalid
self.mark_as_valid()
return
if card_schema.get('err') == 1:
if card_schema.get('err_class') == 'Page not found':
self.mark_as_invalid('wcs_card_not_found')
else:
self.mark_as_valid()
return
self.cached_title = card_schema['name']
self.cached_json = card_schema
self.save(update_fields=['cached_title', 'cached_json'])
self.mark_as_valid()
else:
self.mark_as_invalid('wcs_card_not_defined')
populate_cache()
def get_card_id(self, context):
if self.card_id:
try:
card_id = utils.get_templated_url('{%% load wcs %%}%s' % self.card_id, context)
except utils.TemplateError:
return None
else:
if card_id:
return card_id
card_slug = self.carddef_reference.split(':')[1]
card_id = '%s_id' % card_slug
return context.get(card_id) or None
def get_extra_manager_context(self):
extra_context = super().get_extra_manager_context()
if self.cached_json:
extra_context['card_schema'] = self.cached_json
extra_context['card_schema_id'] = 'cell-%s-card-schema-%s' % (self.pk, self.carddef_reference)
return extra_context
def get_cell_extra_context(self, context):
extra_context = super().get_cell_extra_context(context)
extra_context['title'] = self.cached_title
extra_context['schema'] = self.cached_json
card_id = self.get_card_id(context)
if not card_id:
return extra_context
card_slug = self.carddef_reference.split(':')[1]
api_url = '/api/cards/%s/%s/' % (card_slug, card_id)
wcs_site = get_wcs_services().get(self.wcs_site)
try:
response = requests.get(
api_url,
remote_service=wcs_site,
user=None if self.without_user else getattr(context.get('request'), 'user', None),
without_user=self.without_user,
cache_duration=5,
raise_if_not_cached=not (context.get('synchronous')),
log_errors=False,
)
response.raise_for_status()
except RequestException:
return extra_context
if response.status_code == 200:
extra_context['card'] = response.json()
return extra_context
def get_default_form_class(self):
from .forms import WcsCardInfoCellForm
return WcsCardInfoCellForm
@register_cell_class
class TrackingCodeInputCell(CellBase):
is_enabled = classmethod(is_wcs_enabled)
wcs_site = models.CharField(_('Site'), max_length=50, blank=True)
template_name = 'combo/wcs/tracking_code_input.html'
class Meta:
verbose_name = _('Tracking Code Input')
def get_default_form_class(self):
if len(get_wcs_services()) == 1:
return None
combo_wcs_sites = [('', _('All Sites'))] + [
(x, y.get('title')) for x, y in get_wcs_services().items()
]
return model_forms.modelform_factory(
self.__class__, fields=['wcs_site'], widgets={'wcs_site': Select(choices=combo_wcs_sites)}
)
def get_cell_extra_context(self, context):
extra_context = super(TrackingCodeInputCell, self).get_cell_extra_context(context)
if not self.wcs_site:
self.wcs_site = list(get_wcs_services().keys())[0]
extra_context['url'] = get_wcs_services().get(self.wcs_site).get('url')
return extra_context
@register_cell_class
class BackofficeSubmissionCell(CategoriesValidityMixin, CategoriesFilteringMixin, WcsDataBaseCell):
api_url = '/api/formdefs/?backoffice-submission=on'
variable_name = 'all_formdefs'
template_name = 'combo/wcs/backoffice_submission.html'
cache_duration = 600
user_dependant = True
categories = JSONField(_('Categories'), blank=True, default=dict)
class Meta:
verbose_name = _('Backoffice Submission')
class Media:
js = (
'/jsi18n',
'js/combo.submission.js',
)
def get_default_form_class(self):
from .forms import BackofficeSubmissionCellForm
return BackofficeSubmissionCellForm
def get_cell_extra_context(self, context):
context = super(BackofficeSubmissionCell, self).get_cell_extra_context(context)
all_formdefs = context.pop('all_formdefs')
formdefs = {}
# add a fake category where it's missing
for key, site_formdefs in all_formdefs.items():
if not site_formdefs.get('data'):
continue
formdefs[key] = site_formdefs
for formdef in site_formdefs['data']:
if 'category' not in formdef:
formdef['category'] = _('Misc')
context['all_formdefs'] = formdefs
return context