wcs: add wcs cards|objects template filters (#64714)
This commit is contained in:
parent
13cdff1e78
commit
a37ceab9cd
|
@ -0,0 +1,36 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django import template
|
||||
|
||||
register = template.Library()
|
||||
|
||||
|
||||
@register.filter
|
||||
def getlist(mapping, key):
|
||||
if mapping is None:
|
||||
return []
|
||||
mapping = list(mapping)
|
||||
for value in mapping:
|
||||
try:
|
||||
yield value.get(key)
|
||||
except AttributeError:
|
||||
yield None
|
||||
|
||||
|
||||
@register.filter(name='list')
|
||||
def as_list(obj):
|
||||
return list(obj)
|
|
@ -0,0 +1,29 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import urllib.parse
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
def get_known_service_for_url(url):
|
||||
netloc = urllib.parse.urlparse(url).netloc
|
||||
for services in settings.KNOWN_SERVICES.values():
|
||||
for service in services.values():
|
||||
remote_url = service.get('url')
|
||||
if urllib.parse.urlparse(remote_url).netloc == netloc:
|
||||
return service
|
||||
return None
|
|
@ -0,0 +1,147 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import urllib.parse
|
||||
from io import BytesIO
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
from django.utils.encoding import smart_bytes
|
||||
from django.utils.http import urlencode
|
||||
from requests import Response
|
||||
from requests import Session as RequestsSession
|
||||
from requests.auth import AuthBase
|
||||
|
||||
from .misc import get_known_service_for_url
|
||||
from .signature import sign_url
|
||||
|
||||
|
||||
class NothingInCacheException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PublikSignature(AuthBase):
|
||||
def __init__(self, secret):
|
||||
self.secret = secret
|
||||
|
||||
def __call__(self, request):
|
||||
request.url = sign_url(request.url, self.secret)
|
||||
return request
|
||||
|
||||
|
||||
class Requests(RequestsSession):
|
||||
def request(self, method, url, **kwargs):
|
||||
remote_service = kwargs.pop('remote_service', None)
|
||||
cache_duration = kwargs.pop('cache_duration', 15)
|
||||
invalidate_cache = kwargs.pop('invalidate_cache', False)
|
||||
user = kwargs.pop('user', None)
|
||||
django_request = kwargs.pop('django_request', None)
|
||||
without_user = kwargs.pop('without_user', False)
|
||||
federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid'
|
||||
raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
|
||||
log_errors = kwargs.pop('log_errors', True)
|
||||
|
||||
# don't use persistent cookies
|
||||
self.cookies.clear()
|
||||
|
||||
if remote_service == 'auto':
|
||||
remote_service = get_known_service_for_url(url)
|
||||
if remote_service:
|
||||
# only keeps the path (URI) in url parameter, scheme and netloc are
|
||||
# in remote_service
|
||||
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
|
||||
url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
|
||||
else:
|
||||
logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
|
||||
|
||||
if remote_service:
|
||||
if isinstance(user, dict):
|
||||
query_params = user.copy()
|
||||
elif not user or not user.is_authenticated:
|
||||
if without_user:
|
||||
query_params = {}
|
||||
else:
|
||||
query_params = {'NameID': '', 'email': ''}
|
||||
else:
|
||||
query_params = {}
|
||||
if federation_key == 'nameid':
|
||||
query_params['NameID'] = user.get_name_id()
|
||||
elif federation_key == 'email':
|
||||
query_params['email'] = user.email
|
||||
else: # 'auto'
|
||||
user_name_id = user.get_name_id()
|
||||
if user_name_id:
|
||||
query_params['NameID'] = user_name_id
|
||||
else:
|
||||
query_params['email'] = user.email
|
||||
|
||||
if remote_service.get('orig'):
|
||||
query_params['orig'] = remote_service.get('orig')
|
||||
|
||||
remote_service_base_url = remote_service.get('url')
|
||||
scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(
|
||||
remote_service_base_url
|
||||
)
|
||||
|
||||
query = urlencode(query_params)
|
||||
if '?' in url:
|
||||
path, old_query = url.split('?', 1)
|
||||
query += '&' + old_query
|
||||
else:
|
||||
path = url
|
||||
|
||||
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
|
||||
|
||||
if method == 'GET' and cache_duration:
|
||||
# handle cache
|
||||
params = urlencode(kwargs.get('params', {}))
|
||||
cache_key = hashlib.md5(smart_bytes(url + params)).hexdigest()
|
||||
cache_content = cache.get(cache_key)
|
||||
if cache_content and not invalidate_cache:
|
||||
response = Response()
|
||||
response.status_code = 200
|
||||
response.raw = BytesIO(smart_bytes(cache_content))
|
||||
return response
|
||||
elif raise_if_not_cached:
|
||||
raise NothingInCacheException()
|
||||
|
||||
if remote_service: # sign
|
||||
kwargs['auth'] = PublikSignature(remote_service.get('secret'))
|
||||
|
||||
kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
|
||||
|
||||
response = super().request(method, url, **kwargs)
|
||||
if log_errors and (response.status_code // 100 != 2):
|
||||
extra = {}
|
||||
if django_request:
|
||||
extra['request'] = django_request
|
||||
if log_errors == 'warn':
|
||||
logging.warning(
|
||||
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
|
||||
)
|
||||
else:
|
||||
logging.error(
|
||||
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
|
||||
)
|
||||
if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
|
||||
cache.set(cache_key, response.content, cache_duration)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
requests = Requests()
|
|
@ -0,0 +1,52 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import hashlib
|
||||
import hmac
|
||||
import random
|
||||
import urllib.parse
|
||||
|
||||
from django.utils.encoding import smart_bytes
|
||||
from django.utils.http import quote, urlencode
|
||||
|
||||
|
||||
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
|
||||
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
|
||||
|
||||
|
||||
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.utcnow()
|
||||
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
|
||||
if nonce is None:
|
||||
nonce = hex(random.getrandbits(128))[2:]
|
||||
new_query = query
|
||||
if new_query:
|
||||
new_query += '&'
|
||||
new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
|
||||
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
|
||||
new_query += '&signature=' + quote(signature)
|
||||
return new_query
|
||||
|
||||
|
||||
def sign_string(s, key, algo='sha256', timedelta=30):
|
||||
digestmod = getattr(hashlib, algo)
|
||||
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
|
||||
return hash.digest()
|
|
@ -0,0 +1,197 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils.http import urlencode
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from publik_django_templatetags.utils.requests_wrapper import requests
|
||||
from publik_django_templatetags.wcs.utils import get_wcs_services
|
||||
|
||||
|
||||
def get_default_wcs_service_key():
|
||||
services = get_wcs_services()
|
||||
|
||||
for key, service in services.items():
|
||||
if not service.get('secondary', False):
|
||||
# if secondary is not set or not set to True, return this one
|
||||
return key
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class LazyCardDefObjectsManager:
|
||||
def __init__(self, service_key, card_id, custom_view_id=None, filters=None, user=Ellipsis):
|
||||
self._service_key = service_key
|
||||
self._card_id = card_id
|
||||
self._custom_view_id = custom_view_id
|
||||
|
||||
self._filters = filters or {}
|
||||
self._user = user
|
||||
|
||||
self._cached_resultset = None
|
||||
|
||||
def _clone(self):
|
||||
return LazyCardDefObjectsManager(
|
||||
service_key=self._service_key,
|
||||
card_id=self._card_id,
|
||||
custom_view_id=self._custom_view_id,
|
||||
filters=self._filters,
|
||||
user=self._user,
|
||||
)
|
||||
|
||||
def order_by(self, attribute):
|
||||
qs = self._clone()
|
||||
qs._filters['order_by'] = attribute
|
||||
if not attribute:
|
||||
del qs._filters['order_by']
|
||||
return qs
|
||||
|
||||
def with_custom_view(self, custom_view_id):
|
||||
qs = self._clone()
|
||||
qs._custom_view_id = custom_view_id
|
||||
return qs
|
||||
|
||||
def get_full(self):
|
||||
qs = self._clone()
|
||||
qs._filters['full'] = 'on'
|
||||
return qs
|
||||
|
||||
def access_control(self, user):
|
||||
qs = self._clone()
|
||||
qs._user = user
|
||||
return qs
|
||||
|
||||
@property
|
||||
def count(self):
|
||||
return len(self)
|
||||
|
||||
def filter_by(self, attribute):
|
||||
qs = self._clone()
|
||||
qs.pending_attr = attribute
|
||||
return qs
|
||||
|
||||
def apply_filter_value(self, value):
|
||||
assert self.pending_attr
|
||||
qs = self._clone()
|
||||
if value is None:
|
||||
value = ''
|
||||
if isinstance(value, bool):
|
||||
value = str(value).lower()
|
||||
qs._filters['filter-%s' % self.pending_attr] = value
|
||||
return qs
|
||||
|
||||
def filter_by_internal_id(self, internal_id):
|
||||
qs = self._clone()
|
||||
if internal_id:
|
||||
qs._filters['filter-internal-id'] = internal_id
|
||||
return qs
|
||||
|
||||
def filter_by_number(self, number):
|
||||
qs = self._clone()
|
||||
if number:
|
||||
qs._filters['filter-number'] = number
|
||||
return qs
|
||||
|
||||
def filter_by_user(self, user):
|
||||
qs = self._clone()
|
||||
if user and user.is_authenticated and user.get_name_id():
|
||||
qs._filters['filter-user-uuid'] = user.get_name_id()
|
||||
return qs
|
||||
|
||||
def filter_by_status(self, status):
|
||||
qs = self._clone()
|
||||
if status:
|
||||
qs._filters['filter'] = status
|
||||
return qs
|
||||
|
||||
def _get_results_from_wcs(self):
|
||||
service = get_wcs_services().get(self._service_key)
|
||||
if not service:
|
||||
return []
|
||||
|
||||
api_url = 'api/cards/%s/list' % self._card_id
|
||||
if self._custom_view_id:
|
||||
api_url += '/%s' % self._custom_view_id
|
||||
if self._filters:
|
||||
query = urlencode(self._filters)
|
||||
api_url += '?%s' % query
|
||||
without_user = self._user is Ellipsis # not set
|
||||
try:
|
||||
response = requests.get(
|
||||
api_url,
|
||||
remote_service=service,
|
||||
user=None if without_user else self._user,
|
||||
without_user=without_user,
|
||||
log_errors=False,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except RequestException:
|
||||
return []
|
||||
|
||||
if response.json().get('err') == 1:
|
||||
return []
|
||||
|
||||
return response.json().get('data') or []
|
||||
|
||||
def _populate_cache(self):
|
||||
if self._cached_resultset is not None:
|
||||
return
|
||||
self._cached_resultset = self._get_results_from_wcs()
|
||||
|
||||
def __len__(self):
|
||||
self._populate_cache()
|
||||
return len(self._cached_resultset)
|
||||
|
||||
def __getitem__(self, key):
|
||||
try:
|
||||
if not isinstance(key, slice):
|
||||
int(key)
|
||||
except ValueError:
|
||||
raise TypeError
|
||||
self._populate_cache()
|
||||
return self._cached_resultset[key]
|
||||
|
||||
def __iter__(self):
|
||||
self._populate_cache()
|
||||
yield from self._cached_resultset
|
||||
|
||||
def __nonzero__(self):
|
||||
return any(self)
|
||||
|
||||
|
||||
class LazyCardDef:
|
||||
def __init__(self, slug):
|
||||
if ':' in slug:
|
||||
self.service_key, self.card_id = slug.split(':')[:2]
|
||||
else:
|
||||
self.card_id = slug
|
||||
self.service_key = get_default_wcs_service_key()
|
||||
|
||||
@property
|
||||
def objects(self):
|
||||
return LazyCardDefObjectsManager(self.service_key, self.card_id)
|
||||
|
||||
|
||||
class Cards:
|
||||
def __getattr__(self, attr):
|
||||
try:
|
||||
return LazyCardDef(attr)
|
||||
except KeyError:
|
||||
raise AttributeError(attr)
|
||||
|
||||
|
||||
def cards(request):
|
||||
return {'cards': Cards()}
|
|
@ -0,0 +1,79 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django import template
|
||||
|
||||
register = template.Library()
|
||||
|
||||
|
||||
@register.filter
|
||||
def objects(cards, slug):
|
||||
return getattr(cards, slug).objects
|
||||
|
||||
|
||||
@register.filter
|
||||
def with_custom_view(queryset, custom_view_id):
|
||||
return queryset.with_custom_view(custom_view_id)
|
||||
|
||||
|
||||
@register.filter
|
||||
def get_full(queryset):
|
||||
return queryset.get_full()
|
||||
|
||||
|
||||
@register.filter
|
||||
def access_control(queryset, user):
|
||||
return queryset.access_control(user)
|
||||
|
||||
|
||||
@register.filter
|
||||
def count(queryset):
|
||||
return queryset.count
|
||||
|
||||
|
||||
@register.filter
|
||||
def filter_by(queryset, attribute):
|
||||
return queryset.filter_by(attribute)
|
||||
|
||||
|
||||
@register.filter
|
||||
def filter_value(queryset, value):
|
||||
return queryset.apply_filter_value(value)
|
||||
|
||||
|
||||
@register.filter
|
||||
def filter_by_internal_id(queryset, internal_id):
|
||||
return queryset.filter_by_internal_id(internal_id)
|
||||
|
||||
|
||||
@register.filter
|
||||
def filter_by_number(queryset, number):
|
||||
return queryset.filter_by_number(number)
|
||||
|
||||
|
||||
@register.filter
|
||||
def filter_by_user(queryset, user):
|
||||
return queryset.filter_by_user(user)
|
||||
|
||||
|
||||
@register.filter
|
||||
def filter_by_status(queryset, status):
|
||||
return queryset.filter_by_status(status)
|
||||
|
||||
|
||||
@register.filter
|
||||
def order_by(queryset, attribute):
|
||||
return queryset.order_by(attribute)
|
|
@ -0,0 +1,27 @@
|
|||
# publik-django-templatetags
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
def is_wcs_enabled(cls):
|
||||
return hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get('wcs')
|
||||
|
||||
|
||||
def get_wcs_services():
|
||||
if not is_wcs_enabled(None):
|
||||
return {}
|
||||
return settings.KNOWN_SERVICES.get('wcs')
|
|
@ -0,0 +1,10 @@
|
|||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nocache(settings):
|
||||
settings.CACHES = {
|
||||
'default': {
|
||||
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
|
||||
}
|
||||
}
|
|
@ -29,6 +29,23 @@ KNOWN_SERVICES = {
|
|||
},
|
||||
}
|
||||
|
||||
TEMPLATES = [
|
||||
{
|
||||
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||
'DIRS': [],
|
||||
'APP_DIRS': True,
|
||||
'OPTIONS': {
|
||||
'context_processors': [
|
||||
'publik_django_templatetags.wcs.context_processors.cards',
|
||||
],
|
||||
'builtins': [
|
||||
'publik_django_templatetags.publik.templatetags.publik',
|
||||
'publik_django_templatetags.wcs.templatetags.wcs',
|
||||
],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
REQUESTS_TIMEOUT = 25
|
||||
|
||||
DEBUG = True
|
||||
|
|
|
@ -0,0 +1,409 @@
|
|||
import copy
|
||||
import json
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.template import Context, Template
|
||||
from django.test.client import RequestFactory
|
||||
from requests.exceptions import ConnectionError
|
||||
from requests.models import Response
|
||||
|
||||
from publik_django_templatetags.wcs.context_processors import Cards
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def context():
|
||||
ctx = Context(
|
||||
{
|
||||
'cards': Cards(),
|
||||
'request': RequestFactory().get('/'),
|
||||
}
|
||||
)
|
||||
ctx['request'].user = None
|
||||
return ctx
|
||||
|
||||
|
||||
class MockAnonymousUser:
|
||||
is_authenticated = False
|
||||
is_anonymous = True
|
||||
|
||||
|
||||
class MockUser:
|
||||
email = 'foo@example.net'
|
||||
is_authenticated = True
|
||||
is_anonymous = False
|
||||
|
||||
def get_name_id(self):
|
||||
return None
|
||||
|
||||
|
||||
class MockUserWithNameId:
|
||||
email = 'foo@example.net'
|
||||
is_authenticated = True
|
||||
is_anonymous = False
|
||||
|
||||
def get_name_id(self):
|
||||
return 'xyz'
|
||||
|
||||
|
||||
class MockedRequestResponse(mock.Mock):
|
||||
status_code = 200
|
||||
|
||||
def json(self):
|
||||
return json.loads(self.content)
|
||||
|
||||
|
||||
def mocked_requests_send(request, **kwargs):
|
||||
data = [{'id': 1, 'fields': {'foo': 'bar'}}, {'id': 2, 'fields': {'foo': 'baz'}}] # fake result
|
||||
return MockedRequestResponse(content=json.dumps({'data': data}))
|
||||
|
||||
|
||||
def test_context(context):
|
||||
assert 'cards' in context
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_objects(mock_send, settings, context, nocache):
|
||||
# lazy filters
|
||||
t = Template('{{ cards|objects:"foo" }}')
|
||||
assert t.render(context).startswith(
|
||||
'<publik_django_templatetags.wcs.context_processors.LazyCardDefObjectsManager object at'
|
||||
)
|
||||
assert mock_send.call_args_list == [] # lazy
|
||||
t = Template('{{ cards|objects:"default:foo" }}')
|
||||
assert t.render(context).startswith(
|
||||
'<publik_django_templatetags.wcs.context_processors.LazyCardDefObjectsManager object at'
|
||||
)
|
||||
assert mock_send.call_args_list == [] # lazy
|
||||
|
||||
# test filters evaluation
|
||||
t = Template('{% for card in cards|objects:"foo" %}{{ card.id }} {% endfor %}')
|
||||
assert t.render(context) == "1 2 "
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith(
|
||||
'http://127.0.0.1:8999/api/cards/foo/list?'
|
||||
) # primary service
|
||||
t = Template('{{ cards|objects:"default:foo"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.1:8999/api/cards/foo/list?')
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"other:foo"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.2:8999/api/cards/foo/list?')
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"unknown:foo"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list == [] # unknown, not evaluated
|
||||
|
||||
# test card_id with variable
|
||||
context['foobar'] = 'some-slug'
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:foobar|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.1:8999/api/cards/some-slug/list?')
|
||||
|
||||
# test with no secondary param
|
||||
KNOWN_SERVICES = copy.deepcopy(settings.KNOWN_SERVICES)
|
||||
KNOWN_SERVICES['wcs'] = {'default': {'url': 'http://127.0.0.3:8999/'}}
|
||||
settings.KNOWN_SERVICES = KNOWN_SERVICES
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"bar"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.3:8999/api/cards/bar/list?')
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_with_custom_view(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|with_custom_view:"foobar"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith(
|
||||
'http://127.0.0.1:8999/api/cards/foo/list/foobar?'
|
||||
) # primary service
|
||||
t = Template('{{ cards|objects:"default:foo"|with_custom_view:"foobar"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith(
|
||||
'http://127.0.0.1:8999/api/cards/foo/list/foobar?'
|
||||
)
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"other:foo"|with_custom_view:"foobar"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list[0][0][0].url.startswith(
|
||||
'http://127.0.0.2:8999/api/cards/foo/list/foobar?'
|
||||
)
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"unknown:foo"|with_custom_view:"foobar"|list }}')
|
||||
t.render(context)
|
||||
assert mock_send.call_args_list == [] # unknown, not evaluated
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_full(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'full=on&' not in mock_send.call_args_list[0][0][0].url
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|get_full|list }}')
|
||||
t.render(context)
|
||||
assert 'full=on&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_errors(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
|
||||
with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
|
||||
mock_resp = Response()
|
||||
mock_resp.status_code = 500
|
||||
requests_get.return_value = mock_resp
|
||||
assert t.render(context) == "[]"
|
||||
|
||||
with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
|
||||
requests_get.side_effect = ConnectionError()
|
||||
requests_get.return_value = mock_resp
|
||||
assert t.render(context) == "[]"
|
||||
|
||||
with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
|
||||
mock_resp = Response()
|
||||
mock_resp.status_code = 404
|
||||
requests_get.return_value = mock_resp
|
||||
assert t.render(context) == "[]"
|
||||
|
||||
mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({'err': 1}))
|
||||
assert t.render(context) == "[]"
|
||||
|
||||
mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({}))
|
||||
assert t.render(context) == "[]"
|
||||
|
||||
mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({'data': None}))
|
||||
assert t.render(context) == "[]"
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_access_control(mock_send, context, nocache):
|
||||
# no user in context
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email' not in mock_send.call_args_list[0][0][0].url
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID=&' in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email=&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
# current user in anonymous
|
||||
context['request'].user = MockAnonymousUser()
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email' not in mock_send.call_args_list[0][0][0].url
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID=&' in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email=&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
# current user with uuid
|
||||
context['request'].user = MockUserWithNameId()
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email' not in mock_send.call_args_list[0][0][0].url
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID=xyz&' in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
# current user without uuid
|
||||
context['request'].user = MockUser()
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email' not in mock_send.call_args_list[0][0][0].url
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
|
||||
t.render(context)
|
||||
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
|
||||
assert 'email=foo%40example.net&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_count(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|count }}')
|
||||
assert t.render(context) == "2"
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_filter(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:"bar"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-foo=bar&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template(
|
||||
'{{ cards|objects:"foo"|filter_by:"foo"|filter_value:"bar"|filter_by:"foo2"|filter_value:"bar2"|list }}'
|
||||
)
|
||||
t.render(context)
|
||||
assert 'filter-foo=bar&' in mock_send.call_args_list[0][0][0].url
|
||||
assert 'filter-foo2=bar2&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
# check boolean
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:True|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-foo=true&' in mock_send.call_args_list[0][0][0].url
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:False|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-foo=false&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
# check None
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:None|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-foo=&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_filter_by_internal_id(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_internal_id:None|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_internal_id:""|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_internal_id:"42"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-internal-id=42&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_filter_by_number(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_number:None|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_number:""|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_number:"42-35"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-number=42-35&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_filter_by_user(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_user:request.user|list }}')
|
||||
t.render(context)
|
||||
assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
context['request'].user = MockAnonymousUser()
|
||||
mock_send.reset_mock()
|
||||
t.render(context)
|
||||
assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
context['request'].user = MockUser()
|
||||
mock_send.reset_mock()
|
||||
t.render(context)
|
||||
assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
context['request'].user = MockUserWithNameId()
|
||||
mock_send.reset_mock()
|
||||
t.render(context)
|
||||
assert 'filter-user-uuid=xyz&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_filter_by_status(mock_send, context, nocache):
|
||||
t = Template('{{ cards|objects:"foo"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_status:None|list }}')
|
||||
t.render(context)
|
||||
assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_status:""|list }}')
|
||||
t.render(context)
|
||||
assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{{ cards|objects:"foo"|filter_by_status:"foobar"|list }}')
|
||||
t.render(context)
|
||||
assert 'filter=foobar&' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_getlist(mock_send, context, nocache):
|
||||
t = Template('{% for v in cards|objects:"foo"|getlist:"id" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert t.render(context) == "1,2,"
|
||||
t = Template('{% for v in cards|objects:"foo"|getlist:"fields" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
result = t.render(context)
|
||||
if ''' in result:
|
||||
# django 2.2
|
||||
assert result == "{'foo': 'bar'},{'foo': 'baz'},"
|
||||
else:
|
||||
# django 3.2
|
||||
assert result == "{'foo': 'bar'},{'foo': 'baz'},"
|
||||
t = Template('{% for v in cards|objects:"foo"|getlist:"fields"|getlist:"foo" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert t.render(context) == "bar,baz,"
|
||||
t = Template('{% for v in cards|objects:"foo"|getlist:"fields"|getlist:"unknown" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert t.render(context) == "None,None,"
|
||||
|
||||
|
||||
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
|
||||
def test_order_by(mock_send, context, nocache):
|
||||
t = Template('{% for v in cards|objects:"foo" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert 'order_by' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{% for v in cards|objects:"foo"|order_by:"bar" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert 'order_by=bar' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{% for v in cards|objects:"foo"|order_by:"-bar" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert 'order_by=-bar' in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{% for v in cards|objects:"foo"|order_by:"" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert 'order_by' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{% for v in cards|objects:"foo"|order_by:"bar"|order_by:"" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert 'order_by' not in mock_send.call_args_list[0][0][0].url
|
||||
|
||||
mock_send.reset_mock()
|
||||
t = Template('{% for v in cards|objects:"foo"|order_by:""|order_by:"bar" %}{{ v }},{% endfor %}')
|
||||
t.render(context)
|
||||
assert 'order_by=bar' in mock_send.call_args_list[0][0][0].url
|
Loading…
Reference in New Issue