wcs: add wcs cards|objects template filters (#64714)

This commit is contained in:
Lauréline Guérin 2022-05-02 09:39:16 +02:00
parent 13cdff1e78
commit a37ceab9cd
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
15 changed files with 1003 additions and 0 deletions

View File

@ -0,0 +1,36 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import template
register = template.Library()
@register.filter
def getlist(mapping, key):
if mapping is None:
return []
mapping = list(mapping)
for value in mapping:
try:
yield value.get(key)
except AttributeError:
yield None
@register.filter(name='list')
def as_list(obj):
return list(obj)

View File

@ -0,0 +1,29 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
from django.conf import settings
def get_known_service_for_url(url):
netloc = urllib.parse.urlparse(url).netloc
for services in settings.KNOWN_SERVICES.values():
for service in services.values():
remote_url = service.get('url')
if urllib.parse.urlparse(remote_url).netloc == netloc:
return service
return None

View File

@ -0,0 +1,147 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import logging
import urllib.parse
from io import BytesIO
from django.conf import settings
from django.core.cache import cache
from django.utils.encoding import smart_bytes
from django.utils.http import urlencode
from requests import Response
from requests import Session as RequestsSession
from requests.auth import AuthBase
from .misc import get_known_service_for_url
from .signature import sign_url
class NothingInCacheException(Exception):
pass
class PublikSignature(AuthBase):
def __init__(self, secret):
self.secret = secret
def __call__(self, request):
request.url = sign_url(request.url, self.secret)
return request
class Requests(RequestsSession):
def request(self, method, url, **kwargs):
remote_service = kwargs.pop('remote_service', None)
cache_duration = kwargs.pop('cache_duration', 15)
invalidate_cache = kwargs.pop('invalidate_cache', False)
user = kwargs.pop('user', None)
django_request = kwargs.pop('django_request', None)
without_user = kwargs.pop('without_user', False)
federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid'
raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
log_errors = kwargs.pop('log_errors', True)
# don't use persistent cookies
self.cookies.clear()
if remote_service == 'auto':
remote_service = get_known_service_for_url(url)
if remote_service:
# only keeps the path (URI) in url parameter, scheme and netloc are
# in remote_service
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
else:
logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
if remote_service:
if isinstance(user, dict):
query_params = user.copy()
elif not user or not user.is_authenticated:
if without_user:
query_params = {}
else:
query_params = {'NameID': '', 'email': ''}
else:
query_params = {}
if federation_key == 'nameid':
query_params['NameID'] = user.get_name_id()
elif federation_key == 'email':
query_params['email'] = user.email
else: # 'auto'
user_name_id = user.get_name_id()
if user_name_id:
query_params['NameID'] = user_name_id
else:
query_params['email'] = user.email
if remote_service.get('orig'):
query_params['orig'] = remote_service.get('orig')
remote_service_base_url = remote_service.get('url')
scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(
remote_service_base_url
)
query = urlencode(query_params)
if '?' in url:
path, old_query = url.split('?', 1)
query += '&' + old_query
else:
path = url
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
if method == 'GET' and cache_duration:
# handle cache
params = urlencode(kwargs.get('params', {}))
cache_key = hashlib.md5(smart_bytes(url + params)).hexdigest()
cache_content = cache.get(cache_key)
if cache_content and not invalidate_cache:
response = Response()
response.status_code = 200
response.raw = BytesIO(smart_bytes(cache_content))
return response
elif raise_if_not_cached:
raise NothingInCacheException()
if remote_service: # sign
kwargs['auth'] = PublikSignature(remote_service.get('secret'))
kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
response = super().request(method, url, **kwargs)
if log_errors and (response.status_code // 100 != 2):
extra = {}
if django_request:
extra['request'] = django_request
if log_errors == 'warn':
logging.warning(
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
)
else:
logging.error(
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
)
if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
cache.set(cache_key, response.content, cache_duration)
return response
requests = Requests()

View File

@ -0,0 +1,52 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import hashlib
import hmac
import random
import urllib.parse
from django.utils.encoding import smart_bytes
from django.utils.http import quote, urlencode
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
return hash.digest()

View File

@ -0,0 +1,197 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.utils.http import urlencode
from requests.exceptions import RequestException
from publik_django_templatetags.utils.requests_wrapper import requests
from publik_django_templatetags.wcs.utils import get_wcs_services
def get_default_wcs_service_key():
services = get_wcs_services()
for key, service in services.items():
if not service.get('secondary', False):
# if secondary is not set or not set to True, return this one
return key
return None
class LazyCardDefObjectsManager:
def __init__(self, service_key, card_id, custom_view_id=None, filters=None, user=Ellipsis):
self._service_key = service_key
self._card_id = card_id
self._custom_view_id = custom_view_id
self._filters = filters or {}
self._user = user
self._cached_resultset = None
def _clone(self):
return LazyCardDefObjectsManager(
service_key=self._service_key,
card_id=self._card_id,
custom_view_id=self._custom_view_id,
filters=self._filters,
user=self._user,
)
def order_by(self, attribute):
qs = self._clone()
qs._filters['order_by'] = attribute
if not attribute:
del qs._filters['order_by']
return qs
def with_custom_view(self, custom_view_id):
qs = self._clone()
qs._custom_view_id = custom_view_id
return qs
def get_full(self):
qs = self._clone()
qs._filters['full'] = 'on'
return qs
def access_control(self, user):
qs = self._clone()
qs._user = user
return qs
@property
def count(self):
return len(self)
def filter_by(self, attribute):
qs = self._clone()
qs.pending_attr = attribute
return qs
def apply_filter_value(self, value):
assert self.pending_attr
qs = self._clone()
if value is None:
value = ''
if isinstance(value, bool):
value = str(value).lower()
qs._filters['filter-%s' % self.pending_attr] = value
return qs
def filter_by_internal_id(self, internal_id):
qs = self._clone()
if internal_id:
qs._filters['filter-internal-id'] = internal_id
return qs
def filter_by_number(self, number):
qs = self._clone()
if number:
qs._filters['filter-number'] = number
return qs
def filter_by_user(self, user):
qs = self._clone()
if user and user.is_authenticated and user.get_name_id():
qs._filters['filter-user-uuid'] = user.get_name_id()
return qs
def filter_by_status(self, status):
qs = self._clone()
if status:
qs._filters['filter'] = status
return qs
def _get_results_from_wcs(self):
service = get_wcs_services().get(self._service_key)
if not service:
return []
api_url = 'api/cards/%s/list' % self._card_id
if self._custom_view_id:
api_url += '/%s' % self._custom_view_id
if self._filters:
query = urlencode(self._filters)
api_url += '?%s' % query
without_user = self._user is Ellipsis # not set
try:
response = requests.get(
api_url,
remote_service=service,
user=None if without_user else self._user,
without_user=without_user,
log_errors=False,
)
response.raise_for_status()
except RequestException:
return []
if response.json().get('err') == 1:
return []
return response.json().get('data') or []
def _populate_cache(self):
if self._cached_resultset is not None:
return
self._cached_resultset = self._get_results_from_wcs()
def __len__(self):
self._populate_cache()
return len(self._cached_resultset)
def __getitem__(self, key):
try:
if not isinstance(key, slice):
int(key)
except ValueError:
raise TypeError
self._populate_cache()
return self._cached_resultset[key]
def __iter__(self):
self._populate_cache()
yield from self._cached_resultset
def __nonzero__(self):
return any(self)
class LazyCardDef:
def __init__(self, slug):
if ':' in slug:
self.service_key, self.card_id = slug.split(':')[:2]
else:
self.card_id = slug
self.service_key = get_default_wcs_service_key()
@property
def objects(self):
return LazyCardDefObjectsManager(self.service_key, self.card_id)
class Cards:
def __getattr__(self, attr):
try:
return LazyCardDef(attr)
except KeyError:
raise AttributeError(attr)
def cards(request):
return {'cards': Cards()}

View File

@ -0,0 +1,79 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import template
register = template.Library()
@register.filter
def objects(cards, slug):
return getattr(cards, slug).objects
@register.filter
def with_custom_view(queryset, custom_view_id):
return queryset.with_custom_view(custom_view_id)
@register.filter
def get_full(queryset):
return queryset.get_full()
@register.filter
def access_control(queryset, user):
return queryset.access_control(user)
@register.filter
def count(queryset):
return queryset.count
@register.filter
def filter_by(queryset, attribute):
return queryset.filter_by(attribute)
@register.filter
def filter_value(queryset, value):
return queryset.apply_filter_value(value)
@register.filter
def filter_by_internal_id(queryset, internal_id):
return queryset.filter_by_internal_id(internal_id)
@register.filter
def filter_by_number(queryset, number):
return queryset.filter_by_number(number)
@register.filter
def filter_by_user(queryset, user):
return queryset.filter_by_user(user)
@register.filter
def filter_by_status(queryset, status):
return queryset.filter_by_status(status)
@register.filter
def order_by(queryset, attribute):
return queryset.order_by(attribute)

View File

@ -0,0 +1,27 @@
# publik-django-templatetags
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
def is_wcs_enabled(cls):
return hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get('wcs')
def get_wcs_services():
if not is_wcs_enabled(None):
return {}
return settings.KNOWN_SERVICES.get('wcs')

10
tests/conftest.py Normal file
View File

@ -0,0 +1,10 @@
import pytest
@pytest.fixture
def nocache(settings):
settings.CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
}
}

View File

@ -29,6 +29,23 @@ KNOWN_SERVICES = {
},
}
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'publik_django_templatetags.wcs.context_processors.cards',
],
'builtins': [
'publik_django_templatetags.publik.templatetags.publik',
'publik_django_templatetags.wcs.templatetags.wcs',
],
},
},
]
REQUESTS_TIMEOUT = 25
DEBUG = True

409
tests/test_wcs.py Normal file
View File

@ -0,0 +1,409 @@
import copy
import json
from unittest import mock
import pytest
from django.template import Context, Template
from django.test.client import RequestFactory
from requests.exceptions import ConnectionError
from requests.models import Response
from publik_django_templatetags.wcs.context_processors import Cards
@pytest.fixture
def context():
ctx = Context(
{
'cards': Cards(),
'request': RequestFactory().get('/'),
}
)
ctx['request'].user = None
return ctx
class MockAnonymousUser:
is_authenticated = False
is_anonymous = True
class MockUser:
email = 'foo@example.net'
is_authenticated = True
is_anonymous = False
def get_name_id(self):
return None
class MockUserWithNameId:
email = 'foo@example.net'
is_authenticated = True
is_anonymous = False
def get_name_id(self):
return 'xyz'
class MockedRequestResponse(mock.Mock):
status_code = 200
def json(self):
return json.loads(self.content)
def mocked_requests_send(request, **kwargs):
data = [{'id': 1, 'fields': {'foo': 'bar'}}, {'id': 2, 'fields': {'foo': 'baz'}}] # fake result
return MockedRequestResponse(content=json.dumps({'data': data}))
def test_context(context):
assert 'cards' in context
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_objects(mock_send, settings, context, nocache):
# lazy filters
t = Template('{{ cards|objects:"foo" }}')
assert t.render(context).startswith(
'&lt;publik_django_templatetags.wcs.context_processors.LazyCardDefObjectsManager object at'
)
assert mock_send.call_args_list == [] # lazy
t = Template('{{ cards|objects:"default:foo" }}')
assert t.render(context).startswith(
'&lt;publik_django_templatetags.wcs.context_processors.LazyCardDefObjectsManager object at'
)
assert mock_send.call_args_list == [] # lazy
# test filters evaluation
t = Template('{% for card in cards|objects:"foo" %}{{ card.id }} {% endfor %}')
assert t.render(context) == "1 2 "
assert mock_send.call_args_list[0][0][0].url.startswith(
'http://127.0.0.1:8999/api/cards/foo/list?'
) # primary service
t = Template('{{ cards|objects:"default:foo"|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.1:8999/api/cards/foo/list?')
mock_send.reset_mock()
t = Template('{{ cards|objects:"other:foo"|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.2:8999/api/cards/foo/list?')
mock_send.reset_mock()
t = Template('{{ cards|objects:"unknown:foo"|list }}')
t.render(context)
assert mock_send.call_args_list == [] # unknown, not evaluated
# test card_id with variable
context['foobar'] = 'some-slug'
mock_send.reset_mock()
t = Template('{{ cards|objects:foobar|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.1:8999/api/cards/some-slug/list?')
# test with no secondary param
KNOWN_SERVICES = copy.deepcopy(settings.KNOWN_SERVICES)
KNOWN_SERVICES['wcs'] = {'default': {'url': 'http://127.0.0.3:8999/'}}
settings.KNOWN_SERVICES = KNOWN_SERVICES
mock_send.reset_mock()
t = Template('{{ cards|objects:"bar"|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.3:8999/api/cards/bar/list?')
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_with_custom_view(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|with_custom_view:"foobar"|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith(
'http://127.0.0.1:8999/api/cards/foo/list/foobar?'
) # primary service
t = Template('{{ cards|objects:"default:foo"|with_custom_view:"foobar"|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith(
'http://127.0.0.1:8999/api/cards/foo/list/foobar?'
)
mock_send.reset_mock()
t = Template('{{ cards|objects:"other:foo"|with_custom_view:"foobar"|list }}')
t.render(context)
assert mock_send.call_args_list[0][0][0].url.startswith(
'http://127.0.0.2:8999/api/cards/foo/list/foobar?'
)
mock_send.reset_mock()
t = Template('{{ cards|objects:"unknown:foo"|with_custom_view:"foobar"|list }}')
t.render(context)
assert mock_send.call_args_list == [] # unknown, not evaluated
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_full(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'full=on&' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|get_full|list }}')
t.render(context)
assert 'full=on&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_errors(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|list }}')
with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 500
requests_get.return_value = mock_resp
assert t.render(context) == "[]"
with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
requests_get.side_effect = ConnectionError()
requests_get.return_value = mock_resp
assert t.render(context) == "[]"
with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 404
requests_get.return_value = mock_resp
assert t.render(context) == "[]"
mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({'err': 1}))
assert t.render(context) == "[]"
mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({}))
assert t.render(context) == "[]"
mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({'data': None}))
assert t.render(context) == "[]"
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_access_control(mock_send, context, nocache):
# no user in context
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
assert 'email' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
t.render(context)
assert 'NameID=&' in mock_send.call_args_list[0][0][0].url
assert 'email=&' in mock_send.call_args_list[0][0][0].url
# current user in anonymous
context['request'].user = MockAnonymousUser()
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
assert 'email' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
t.render(context)
assert 'NameID=&' in mock_send.call_args_list[0][0][0].url
assert 'email=&' in mock_send.call_args_list[0][0][0].url
# current user with uuid
context['request'].user = MockUserWithNameId()
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
assert 'email' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
t.render(context)
assert 'NameID=xyz&' in mock_send.call_args_list[0][0][0].url
assert 'email' not in mock_send.call_args_list[0][0][0].url
# current user without uuid
context['request'].user = MockUser()
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
assert 'email' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
t.render(context)
assert 'NameID' not in mock_send.call_args_list[0][0][0].url
assert 'email=foo%40example.net&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_count(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|count }}')
assert t.render(context) == "2"
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_filter(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:"bar"|list }}')
t.render(context)
assert 'filter-foo=bar&' in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template(
'{{ cards|objects:"foo"|filter_by:"foo"|filter_value:"bar"|filter_by:"foo2"|filter_value:"bar2"|list }}'
)
t.render(context)
assert 'filter-foo=bar&' in mock_send.call_args_list[0][0][0].url
assert 'filter-foo2=bar2&' in mock_send.call_args_list[0][0][0].url
# check boolean
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:True|list }}')
t.render(context)
assert 'filter-foo=true&' in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:False|list }}')
t.render(context)
assert 'filter-foo=false&' in mock_send.call_args_list[0][0][0].url
# check None
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:None|list }}')
t.render(context)
assert 'filter-foo=&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_filter_by_internal_id(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_internal_id:None|list }}')
t.render(context)
assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_internal_id:""|list }}')
t.render(context)
assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_internal_id:"42"|list }}')
t.render(context)
assert 'filter-internal-id=42&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_filter_by_number(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_number:None|list }}')
t.render(context)
assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_number:""|list }}')
t.render(context)
assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_number:"42-35"|list }}')
t.render(context)
assert 'filter-number=42-35&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_filter_by_user(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|filter_by_user:request.user|list }}')
t.render(context)
assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
context['request'].user = MockAnonymousUser()
mock_send.reset_mock()
t.render(context)
assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
context['request'].user = MockUser()
mock_send.reset_mock()
t.render(context)
assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
context['request'].user = MockUserWithNameId()
mock_send.reset_mock()
t.render(context)
assert 'filter-user-uuid=xyz&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_filter_by_status(mock_send, context, nocache):
t = Template('{{ cards|objects:"foo"|list }}')
t.render(context)
assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_status:None|list }}')
t.render(context)
assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_status:""|list }}')
t.render(context)
assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{{ cards|objects:"foo"|filter_by_status:"foobar"|list }}')
t.render(context)
assert 'filter=foobar&' in mock_send.call_args_list[0][0][0].url
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_getlist(mock_send, context, nocache):
t = Template('{% for v in cards|objects:"foo"|getlist:"id" %}{{ v }},{% endfor %}')
t.render(context)
assert t.render(context) == "1,2,"
t = Template('{% for v in cards|objects:"foo"|getlist:"fields" %}{{ v }},{% endfor %}')
t.render(context)
result = t.render(context)
if '&#39;' in result:
# django 2.2
assert result == "{&#39;foo&#39;: &#39;bar&#39;},{&#39;foo&#39;: &#39;baz&#39;},"
else:
# django 3.2
assert result == "{&#x27;foo&#x27;: &#x27;bar&#x27;},{&#x27;foo&#x27;: &#x27;baz&#x27;},"
t = Template('{% for v in cards|objects:"foo"|getlist:"fields"|getlist:"foo" %}{{ v }},{% endfor %}')
t.render(context)
assert t.render(context) == "bar,baz,"
t = Template('{% for v in cards|objects:"foo"|getlist:"fields"|getlist:"unknown" %}{{ v }},{% endfor %}')
t.render(context)
assert t.render(context) == "None,None,"
@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
def test_order_by(mock_send, context, nocache):
t = Template('{% for v in cards|objects:"foo" %}{{ v }},{% endfor %}')
t.render(context)
assert 'order_by' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{% for v in cards|objects:"foo"|order_by:"bar" %}{{ v }},{% endfor %}')
t.render(context)
assert 'order_by=bar' in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{% for v in cards|objects:"foo"|order_by:"-bar" %}{{ v }},{% endfor %}')
t.render(context)
assert 'order_by=-bar' in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{% for v in cards|objects:"foo"|order_by:"" %}{{ v }},{% endfor %}')
t.render(context)
assert 'order_by' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{% for v in cards|objects:"foo"|order_by:"bar"|order_by:"" %}{{ v }},{% endfor %}')
t.render(context)
assert 'order_by' not in mock_send.call_args_list[0][0][0].url
mock_send.reset_mock()
t = Template('{% for v in cards|objects:"foo"|order_by:""|order_by:"bar" %}{{ v }},{% endfor %}')
t.render(context)
assert 'order_by=bar' in mock_send.call_args_list[0][0][0].url