diff --git a/publik_django_templatetags/publik/__init__.py b/publik_django_templatetags/publik/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/publik_django_templatetags/publik/templatetags/__init__.py b/publik_django_templatetags/publik/templatetags/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/publik_django_templatetags/publik/templatetags/publik.py b/publik_django_templatetags/publik/templatetags/publik.py
new file mode 100644
index 0000000..d5a1617
--- /dev/null
+++ b/publik_django_templatetags/publik/templatetags/publik.py
@@ -0,0 +1,36 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from django import template
+
+register = template.Library()
+
+
+@register.filter
+def getlist(mapping, key):
+ if mapping is None:
+ return []
+ mapping = list(mapping)
+ for value in mapping:
+ try:
+ yield value.get(key)
+ except AttributeError:
+ yield None
+
+
+@register.filter(name='list')
+def as_list(obj):
+ return list(obj)
diff --git a/publik_django_templatetags/utils/__init__.py b/publik_django_templatetags/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/publik_django_templatetags/utils/misc.py b/publik_django_templatetags/utils/misc.py
new file mode 100644
index 0000000..1998341
--- /dev/null
+++ b/publik_django_templatetags/utils/misc.py
@@ -0,0 +1,29 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import urllib.parse
+
+from django.conf import settings
+
+
+def get_known_service_for_url(url):
+ netloc = urllib.parse.urlparse(url).netloc
+ for services in settings.KNOWN_SERVICES.values():
+ for service in services.values():
+ remote_url = service.get('url')
+ if urllib.parse.urlparse(remote_url).netloc == netloc:
+ return service
+ return None
diff --git a/publik_django_templatetags/utils/requests_wrapper.py b/publik_django_templatetags/utils/requests_wrapper.py
new file mode 100644
index 0000000..075a102
--- /dev/null
+++ b/publik_django_templatetags/utils/requests_wrapper.py
@@ -0,0 +1,147 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import hashlib
+import logging
+import urllib.parse
+from io import BytesIO
+
+from django.conf import settings
+from django.core.cache import cache
+from django.utils.encoding import smart_bytes
+from django.utils.http import urlencode
+from requests import Response
+from requests import Session as RequestsSession
+from requests.auth import AuthBase
+
+from .misc import get_known_service_for_url
+from .signature import sign_url
+
+
+class NothingInCacheException(Exception):
+ pass
+
+
+class PublikSignature(AuthBase):
+ def __init__(self, secret):
+ self.secret = secret
+
+ def __call__(self, request):
+ request.url = sign_url(request.url, self.secret)
+ return request
+
+
+class Requests(RequestsSession):
+ def request(self, method, url, **kwargs):
+ remote_service = kwargs.pop('remote_service', None)
+ cache_duration = kwargs.pop('cache_duration', 15)
+ invalidate_cache = kwargs.pop('invalidate_cache', False)
+ user = kwargs.pop('user', None)
+ django_request = kwargs.pop('django_request', None)
+ without_user = kwargs.pop('without_user', False)
+ federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid'
+ raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
+ log_errors = kwargs.pop('log_errors', True)
+
+ # don't use persistent cookies
+ self.cookies.clear()
+
+ if remote_service == 'auto':
+ remote_service = get_known_service_for_url(url)
+ if remote_service:
+ # only keeps the path (URI) in url parameter, scheme and netloc are
+ # in remote_service
+ scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
+ url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
+ else:
+ logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
+
+ if remote_service:
+ if isinstance(user, dict):
+ query_params = user.copy()
+ elif not user or not user.is_authenticated:
+ if without_user:
+ query_params = {}
+ else:
+ query_params = {'NameID': '', 'email': ''}
+ else:
+ query_params = {}
+ if federation_key == 'nameid':
+ query_params['NameID'] = user.get_name_id()
+ elif federation_key == 'email':
+ query_params['email'] = user.email
+ else: # 'auto'
+ user_name_id = user.get_name_id()
+ if user_name_id:
+ query_params['NameID'] = user_name_id
+ else:
+ query_params['email'] = user.email
+
+ if remote_service.get('orig'):
+ query_params['orig'] = remote_service.get('orig')
+
+ remote_service_base_url = remote_service.get('url')
+ scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(
+ remote_service_base_url
+ )
+
+ query = urlencode(query_params)
+ if '?' in url:
+ path, old_query = url.split('?', 1)
+ query += '&' + old_query
+ else:
+ path = url
+
+ url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
+
+ if method == 'GET' and cache_duration:
+ # handle cache
+ params = urlencode(kwargs.get('params', {}))
+ cache_key = hashlib.md5(smart_bytes(url + params)).hexdigest()
+ cache_content = cache.get(cache_key)
+ if cache_content and not invalidate_cache:
+ response = Response()
+ response.status_code = 200
+ response.raw = BytesIO(smart_bytes(cache_content))
+ return response
+ elif raise_if_not_cached:
+ raise NothingInCacheException()
+
+ if remote_service: # sign
+ kwargs['auth'] = PublikSignature(remote_service.get('secret'))
+
+ kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
+
+ response = super().request(method, url, **kwargs)
+ if log_errors and (response.status_code // 100 != 2):
+ extra = {}
+ if django_request:
+ extra['request'] = django_request
+ if log_errors == 'warn':
+ logging.warning(
+ 'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
+ )
+ else:
+ logging.error(
+ 'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
+ )
+ if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
+ cache.set(cache_key, response.content, cache_duration)
+
+ return response
+
+
+requests = Requests()
diff --git a/publik_django_templatetags/utils/signature.py b/publik_django_templatetags/utils/signature.py
new file mode 100644
index 0000000..640e8c7
--- /dev/null
+++ b/publik_django_templatetags/utils/signature.py
@@ -0,0 +1,52 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import base64
+import datetime
+import hashlib
+import hmac
+import random
+import urllib.parse
+
+from django.utils.encoding import smart_bytes
+from django.utils.http import quote, urlencode
+
+
+def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
+ parsed = urllib.parse.urlparse(url)
+ new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
+ return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
+
+
+def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
+ if timestamp is None:
+ timestamp = datetime.datetime.utcnow()
+ timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
+ if nonce is None:
+ nonce = hex(random.getrandbits(128))[2:]
+ new_query = query
+ if new_query:
+ new_query += '&'
+ new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
+ signature = base64.b64encode(sign_string(new_query, key, algo=algo))
+ new_query += '&signature=' + quote(signature)
+ return new_query
+
+
+def sign_string(s, key, algo='sha256', timedelta=30):
+ digestmod = getattr(hashlib, algo)
+ hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
+ return hash.digest()
diff --git a/publik_django_templatetags/wcs/__init__.py b/publik_django_templatetags/wcs/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/publik_django_templatetags/wcs/context_processors.py b/publik_django_templatetags/wcs/context_processors.py
new file mode 100644
index 0000000..e8ad0ac
--- /dev/null
+++ b/publik_django_templatetags/wcs/context_processors.py
@@ -0,0 +1,197 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from django.utils.http import urlencode
+from requests.exceptions import RequestException
+
+from publik_django_templatetags.utils.requests_wrapper import requests
+from publik_django_templatetags.wcs.utils import get_wcs_services
+
+
+def get_default_wcs_service_key():
+ services = get_wcs_services()
+
+ for key, service in services.items():
+ if not service.get('secondary', False):
+ # if secondary is not set or not set to True, return this one
+ return key
+
+ return None
+
+
+class LazyCardDefObjectsManager:
+ def __init__(self, service_key, card_id, custom_view_id=None, filters=None, user=Ellipsis):
+ self._service_key = service_key
+ self._card_id = card_id
+ self._custom_view_id = custom_view_id
+
+ self._filters = filters or {}
+ self._user = user
+
+ self._cached_resultset = None
+
+ def _clone(self):
+ return LazyCardDefObjectsManager(
+ service_key=self._service_key,
+ card_id=self._card_id,
+ custom_view_id=self._custom_view_id,
+ filters=self._filters,
+ user=self._user,
+ )
+
+ def order_by(self, attribute):
+ qs = self._clone()
+ qs._filters['order_by'] = attribute
+ if not attribute:
+ del qs._filters['order_by']
+ return qs
+
+ def with_custom_view(self, custom_view_id):
+ qs = self._clone()
+ qs._custom_view_id = custom_view_id
+ return qs
+
+ def get_full(self):
+ qs = self._clone()
+ qs._filters['full'] = 'on'
+ return qs
+
+ def access_control(self, user):
+ qs = self._clone()
+ qs._user = user
+ return qs
+
+ @property
+ def count(self):
+ return len(self)
+
+ def filter_by(self, attribute):
+ qs = self._clone()
+ qs.pending_attr = attribute
+ return qs
+
+ def apply_filter_value(self, value):
+ assert self.pending_attr
+ qs = self._clone()
+ if value is None:
+ value = ''
+ if isinstance(value, bool):
+ value = str(value).lower()
+ qs._filters['filter-%s' % self.pending_attr] = value
+ return qs
+
+ def filter_by_internal_id(self, internal_id):
+ qs = self._clone()
+ if internal_id:
+ qs._filters['filter-internal-id'] = internal_id
+ return qs
+
+ def filter_by_number(self, number):
+ qs = self._clone()
+ if number:
+ qs._filters['filter-number'] = number
+ return qs
+
+ def filter_by_user(self, user):
+ qs = self._clone()
+ if user and user.is_authenticated and user.get_name_id():
+ qs._filters['filter-user-uuid'] = user.get_name_id()
+ return qs
+
+ def filter_by_status(self, status):
+ qs = self._clone()
+ if status:
+ qs._filters['filter'] = status
+ return qs
+
+ def _get_results_from_wcs(self):
+ service = get_wcs_services().get(self._service_key)
+ if not service:
+ return []
+
+ api_url = 'api/cards/%s/list' % self._card_id
+ if self._custom_view_id:
+ api_url += '/%s' % self._custom_view_id
+ if self._filters:
+ query = urlencode(self._filters)
+ api_url += '?%s' % query
+ without_user = self._user is Ellipsis # not set
+ try:
+ response = requests.get(
+ api_url,
+ remote_service=service,
+ user=None if without_user else self._user,
+ without_user=without_user,
+ log_errors=False,
+ )
+ response.raise_for_status()
+ except RequestException:
+ return []
+
+ if response.json().get('err') == 1:
+ return []
+
+ return response.json().get('data') or []
+
+ def _populate_cache(self):
+ if self._cached_resultset is not None:
+ return
+ self._cached_resultset = self._get_results_from_wcs()
+
+ def __len__(self):
+ self._populate_cache()
+ return len(self._cached_resultset)
+
+ def __getitem__(self, key):
+ try:
+ if not isinstance(key, slice):
+ int(key)
+ except ValueError:
+ raise TypeError
+ self._populate_cache()
+ return self._cached_resultset[key]
+
+ def __iter__(self):
+ self._populate_cache()
+ yield from self._cached_resultset
+
+ def __nonzero__(self):
+ return any(self)
+
+
+class LazyCardDef:
+ def __init__(self, slug):
+ if ':' in slug:
+ self.service_key, self.card_id = slug.split(':')[:2]
+ else:
+ self.card_id = slug
+ self.service_key = get_default_wcs_service_key()
+
+ @property
+ def objects(self):
+ return LazyCardDefObjectsManager(self.service_key, self.card_id)
+
+
+class Cards:
+ def __getattr__(self, attr):
+ try:
+ return LazyCardDef(attr)
+ except KeyError:
+ raise AttributeError(attr)
+
+
+def cards(request):
+ return {'cards': Cards()}
diff --git a/publik_django_templatetags/wcs/templatetags/__init__.py b/publik_django_templatetags/wcs/templatetags/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/publik_django_templatetags/wcs/templatetags/wcs.py b/publik_django_templatetags/wcs/templatetags/wcs.py
new file mode 100644
index 0000000..eb18ddf
--- /dev/null
+++ b/publik_django_templatetags/wcs/templatetags/wcs.py
@@ -0,0 +1,79 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from django import template
+
+register = template.Library()
+
+
+@register.filter
+def objects(cards, slug):
+ return getattr(cards, slug).objects
+
+
+@register.filter
+def with_custom_view(queryset, custom_view_id):
+ return queryset.with_custom_view(custom_view_id)
+
+
+@register.filter
+def get_full(queryset):
+ return queryset.get_full()
+
+
+@register.filter
+def access_control(queryset, user):
+ return queryset.access_control(user)
+
+
+@register.filter
+def count(queryset):
+ return queryset.count
+
+
+@register.filter
+def filter_by(queryset, attribute):
+ return queryset.filter_by(attribute)
+
+
+@register.filter
+def filter_value(queryset, value):
+ return queryset.apply_filter_value(value)
+
+
+@register.filter
+def filter_by_internal_id(queryset, internal_id):
+ return queryset.filter_by_internal_id(internal_id)
+
+
+@register.filter
+def filter_by_number(queryset, number):
+ return queryset.filter_by_number(number)
+
+
+@register.filter
+def filter_by_user(queryset, user):
+ return queryset.filter_by_user(user)
+
+
+@register.filter
+def filter_by_status(queryset, status):
+ return queryset.filter_by_status(status)
+
+
+@register.filter
+def order_by(queryset, attribute):
+ return queryset.order_by(attribute)
diff --git a/publik_django_templatetags/wcs/utils.py b/publik_django_templatetags/wcs/utils.py
new file mode 100644
index 0000000..4562977
--- /dev/null
+++ b/publik_django_templatetags/wcs/utils.py
@@ -0,0 +1,27 @@
+# publik-django-templatetags
+# Copyright (C) 2022 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from django.conf import settings
+
+
+def is_wcs_enabled(cls):
+ return hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get('wcs')
+
+
+def get_wcs_services():
+ if not is_wcs_enabled(None):
+ return {}
+ return settings.KNOWN_SERVICES.get('wcs')
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..342da17
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,10 @@
+import pytest
+
+
+@pytest.fixture
+def nocache(settings):
+ settings.CACHES = {
+ 'default': {
+ 'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
+ }
+ }
diff --git a/tests/project/settings.py b/tests/project/settings.py
index a820ed6..5d0117a 100644
--- a/tests/project/settings.py
+++ b/tests/project/settings.py
@@ -29,6 +29,23 @@ KNOWN_SERVICES = {
},
}
+TEMPLATES = [
+ {
+ 'BACKEND': 'django.template.backends.django.DjangoTemplates',
+ 'DIRS': [],
+ 'APP_DIRS': True,
+ 'OPTIONS': {
+ 'context_processors': [
+ 'publik_django_templatetags.wcs.context_processors.cards',
+ ],
+ 'builtins': [
+ 'publik_django_templatetags.publik.templatetags.publik',
+ 'publik_django_templatetags.wcs.templatetags.wcs',
+ ],
+ },
+ },
+]
+
REQUESTS_TIMEOUT = 25
DEBUG = True
diff --git a/tests/test_wcs.py b/tests/test_wcs.py
new file mode 100644
index 0000000..98fb55a
--- /dev/null
+++ b/tests/test_wcs.py
@@ -0,0 +1,409 @@
+import copy
+import json
+from unittest import mock
+
+import pytest
+from django.template import Context, Template
+from django.test.client import RequestFactory
+from requests.exceptions import ConnectionError
+from requests.models import Response
+
+from publik_django_templatetags.wcs.context_processors import Cards
+
+
+@pytest.fixture
+def context():
+ ctx = Context(
+ {
+ 'cards': Cards(),
+ 'request': RequestFactory().get('/'),
+ }
+ )
+ ctx['request'].user = None
+ return ctx
+
+
+class MockAnonymousUser:
+ is_authenticated = False
+ is_anonymous = True
+
+
+class MockUser:
+ email = 'foo@example.net'
+ is_authenticated = True
+ is_anonymous = False
+
+ def get_name_id(self):
+ return None
+
+
+class MockUserWithNameId:
+ email = 'foo@example.net'
+ is_authenticated = True
+ is_anonymous = False
+
+ def get_name_id(self):
+ return 'xyz'
+
+
+class MockedRequestResponse(mock.Mock):
+ status_code = 200
+
+ def json(self):
+ return json.loads(self.content)
+
+
+def mocked_requests_send(request, **kwargs):
+ data = [{'id': 1, 'fields': {'foo': 'bar'}}, {'id': 2, 'fields': {'foo': 'baz'}}] # fake result
+ return MockedRequestResponse(content=json.dumps({'data': data}))
+
+
+def test_context(context):
+ assert 'cards' in context
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_objects(mock_send, settings, context, nocache):
+ # lazy filters
+ t = Template('{{ cards|objects:"foo" }}')
+ assert t.render(context).startswith(
+ '<publik_django_templatetags.wcs.context_processors.LazyCardDefObjectsManager object at'
+ )
+ assert mock_send.call_args_list == [] # lazy
+ t = Template('{{ cards|objects:"default:foo" }}')
+ assert t.render(context).startswith(
+ '<publik_django_templatetags.wcs.context_processors.LazyCardDefObjectsManager object at'
+ )
+ assert mock_send.call_args_list == [] # lazy
+
+ # test filters evaluation
+ t = Template('{% for card in cards|objects:"foo" %}{{ card.id }} {% endfor %}')
+ assert t.render(context) == "1 2 "
+ assert mock_send.call_args_list[0][0][0].url.startswith(
+ 'http://127.0.0.1:8999/api/cards/foo/list?'
+ ) # primary service
+ t = Template('{{ cards|objects:"default:foo"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.1:8999/api/cards/foo/list?')
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"other:foo"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.2:8999/api/cards/foo/list?')
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"unknown:foo"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list == [] # unknown, not evaluated
+
+ # test card_id with variable
+ context['foobar'] = 'some-slug'
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:foobar|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.1:8999/api/cards/some-slug/list?')
+
+ # test with no secondary param
+ KNOWN_SERVICES = copy.deepcopy(settings.KNOWN_SERVICES)
+ KNOWN_SERVICES['wcs'] = {'default': {'url': 'http://127.0.0.3:8999/'}}
+ settings.KNOWN_SERVICES = KNOWN_SERVICES
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"bar"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith('http://127.0.0.3:8999/api/cards/bar/list?')
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_with_custom_view(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|with_custom_view:"foobar"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith(
+ 'http://127.0.0.1:8999/api/cards/foo/list/foobar?'
+ ) # primary service
+ t = Template('{{ cards|objects:"default:foo"|with_custom_view:"foobar"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith(
+ 'http://127.0.0.1:8999/api/cards/foo/list/foobar?'
+ )
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"other:foo"|with_custom_view:"foobar"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list[0][0][0].url.startswith(
+ 'http://127.0.0.2:8999/api/cards/foo/list/foobar?'
+ )
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"unknown:foo"|with_custom_view:"foobar"|list }}')
+ t.render(context)
+ assert mock_send.call_args_list == [] # unknown, not evaluated
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_full(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'full=on&' not in mock_send.call_args_list[0][0][0].url
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|get_full|list }}')
+ t.render(context)
+ assert 'full=on&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_errors(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|list }}')
+
+ with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
+ mock_resp = Response()
+ mock_resp.status_code = 500
+ requests_get.return_value = mock_resp
+ assert t.render(context) == "[]"
+
+ with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
+ requests_get.side_effect = ConnectionError()
+ requests_get.return_value = mock_resp
+ assert t.render(context) == "[]"
+
+ with mock.patch('publik_django_templatetags.wcs.context_processors.requests.get') as requests_get:
+ mock_resp = Response()
+ mock_resp.status_code = 404
+ requests_get.return_value = mock_resp
+ assert t.render(context) == "[]"
+
+ mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({'err': 1}))
+ assert t.render(context) == "[]"
+
+ mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({}))
+ assert t.render(context) == "[]"
+
+ mock_send.side_effect = lambda *a, **k: MockedRequestResponse(content=json.dumps({'data': None}))
+ assert t.render(context) == "[]"
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_access_control(mock_send, context, nocache):
+ # no user in context
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'NameID' not in mock_send.call_args_list[0][0][0].url
+ assert 'email' not in mock_send.call_args_list[0][0][0].url
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
+ t.render(context)
+ assert 'NameID=&' in mock_send.call_args_list[0][0][0].url
+ assert 'email=&' in mock_send.call_args_list[0][0][0].url
+
+ # current user in anonymous
+ context['request'].user = MockAnonymousUser()
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'NameID' not in mock_send.call_args_list[0][0][0].url
+ assert 'email' not in mock_send.call_args_list[0][0][0].url
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
+ t.render(context)
+ assert 'NameID=&' in mock_send.call_args_list[0][0][0].url
+ assert 'email=&' in mock_send.call_args_list[0][0][0].url
+
+ # current user with uuid
+ context['request'].user = MockUserWithNameId()
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'NameID' not in mock_send.call_args_list[0][0][0].url
+ assert 'email' not in mock_send.call_args_list[0][0][0].url
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
+ t.render(context)
+ assert 'NameID=xyz&' in mock_send.call_args_list[0][0][0].url
+ assert 'email' not in mock_send.call_args_list[0][0][0].url
+
+ # current user without uuid
+ context['request'].user = MockUser()
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'NameID' not in mock_send.call_args_list[0][0][0].url
+ assert 'email' not in mock_send.call_args_list[0][0][0].url
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|access_control:request.user|list }}')
+ t.render(context)
+ assert 'NameID' not in mock_send.call_args_list[0][0][0].url
+ assert 'email=foo%40example.net&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_count(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|count }}')
+ assert t.render(context) == "2"
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_filter(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:"bar"|list }}')
+ t.render(context)
+ assert 'filter-foo=bar&' in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template(
+ '{{ cards|objects:"foo"|filter_by:"foo"|filter_value:"bar"|filter_by:"foo2"|filter_value:"bar2"|list }}'
+ )
+ t.render(context)
+ assert 'filter-foo=bar&' in mock_send.call_args_list[0][0][0].url
+ assert 'filter-foo2=bar2&' in mock_send.call_args_list[0][0][0].url
+
+ # check boolean
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:True|list }}')
+ t.render(context)
+ assert 'filter-foo=true&' in mock_send.call_args_list[0][0][0].url
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:False|list }}')
+ t.render(context)
+ assert 'filter-foo=false&' in mock_send.call_args_list[0][0][0].url
+
+ # check None
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by:"foo"|filter_value:None|list }}')
+ t.render(context)
+ assert 'filter-foo=&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_filter_by_internal_id(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_internal_id:None|list }}')
+ t.render(context)
+ assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_internal_id:""|list }}')
+ t.render(context)
+ assert 'filter-internal-id' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_internal_id:"42"|list }}')
+ t.render(context)
+ assert 'filter-internal-id=42&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_filter_by_number(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_number:None|list }}')
+ t.render(context)
+ assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_number:""|list }}')
+ t.render(context)
+ assert 'filter-number' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_number:"42-35"|list }}')
+ t.render(context)
+ assert 'filter-number=42-35&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_filter_by_user(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|filter_by_user:request.user|list }}')
+ t.render(context)
+ assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
+
+ context['request'].user = MockAnonymousUser()
+ mock_send.reset_mock()
+ t.render(context)
+ assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
+
+ context['request'].user = MockUser()
+ mock_send.reset_mock()
+ t.render(context)
+ assert 'filter-user-uuid' not in mock_send.call_args_list[0][0][0].url
+
+ context['request'].user = MockUserWithNameId()
+ mock_send.reset_mock()
+ t.render(context)
+ assert 'filter-user-uuid=xyz&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_filter_by_status(mock_send, context, nocache):
+ t = Template('{{ cards|objects:"foo"|list }}')
+ t.render(context)
+ assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_status:None|list }}')
+ t.render(context)
+ assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_status:""|list }}')
+ t.render(context)
+ assert 'filter=&' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{{ cards|objects:"foo"|filter_by_status:"foobar"|list }}')
+ t.render(context)
+ assert 'filter=foobar&' in mock_send.call_args_list[0][0][0].url
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_getlist(mock_send, context, nocache):
+ t = Template('{% for v in cards|objects:"foo"|getlist:"id" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert t.render(context) == "1,2,"
+ t = Template('{% for v in cards|objects:"foo"|getlist:"fields" %}{{ v }},{% endfor %}')
+ t.render(context)
+ result = t.render(context)
+ if ''' in result:
+ # django 2.2
+ assert result == "{'foo': 'bar'},{'foo': 'baz'},"
+ else:
+ # django 3.2
+ assert result == "{'foo': 'bar'},{'foo': 'baz'},"
+ t = Template('{% for v in cards|objects:"foo"|getlist:"fields"|getlist:"foo" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert t.render(context) == "bar,baz,"
+ t = Template('{% for v in cards|objects:"foo"|getlist:"fields"|getlist:"unknown" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert t.render(context) == "None,None,"
+
+
+@mock.patch('requests.Session.send', side_effect=mocked_requests_send)
+def test_order_by(mock_send, context, nocache):
+ t = Template('{% for v in cards|objects:"foo" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert 'order_by' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{% for v in cards|objects:"foo"|order_by:"bar" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert 'order_by=bar' in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{% for v in cards|objects:"foo"|order_by:"-bar" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert 'order_by=-bar' in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{% for v in cards|objects:"foo"|order_by:"" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert 'order_by' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{% for v in cards|objects:"foo"|order_by:"bar"|order_by:"" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert 'order_by' not in mock_send.call_args_list[0][0][0].url
+
+ mock_send.reset_mock()
+ t = Template('{% for v in cards|objects:"foo"|order_by:""|order_by:"bar" %}{{ v }},{% endfor %}')
+ t.render(context)
+ assert 'order_by=bar' in mock_send.call_args_list[0][0][0].url