pricing: synchronise agendas, list & detail views (#65361)

This commit is contained in:
Lauréline Guérin 2022-05-20 16:13:28 +02:00
parent 1ee53fa3be
commit af62f2f8c6
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
24 changed files with 732 additions and 19 deletions

105
lingo/agendas/chrono.py Normal file
View File

@ -0,0 +1,105 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from django.conf import settings
from requests.exceptions import RequestException
from lingo.agendas.models import Agenda
from lingo.utils import requests
def is_chrono_enabled():
return hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get('chrono')
def get_chrono_service():
if not is_chrono_enabled():
return {}
return list(settings.KNOWN_SERVICES.get('chrono').values())[0]
def get_chrono_json(path, log_errors=True):
chrono_site = get_chrono_service()
if chrono_site is None:
return
try:
response = requests.get(
path,
remote_service=chrono_site,
without_user=True,
headers={'accept': 'application/json'},
log_errors=log_errors,
)
response.raise_for_status()
except RequestException as e:
if e.response is not None:
try:
# return json if available (on 404 responses by example)
return e.response.json()
except json.JSONDecodeError:
pass
return
return response.json()
def collect_agenda_data():
result = get_chrono_json('api/agenda/')
if result is None:
return
if result.get('data') is None:
return
agenda_data = []
for agenda in result['data']:
if agenda['kind'] != 'events':
continue
agenda_data.append(
{
'slug': agenda['slug'],
'label': agenda['text'],
'category_slug': agenda['category'],
'category_label': agenda['category_label'],
}
)
return agenda_data
def refresh_agendas():
result = collect_agenda_data()
if result is None:
return
# fetch existing agendas
existing_agendas = {a.slug: a for a in Agenda.objects.all()}
seen_agendas = []
# build agendas from chrono
for agenda_data in result:
slug = agenda_data['slug']
agenda = existing_agendas.get(slug) or Agenda(slug=slug)
for key, value in agenda_data.items():
if key == 'slug':
continue
setattr(agenda, key, value)
agenda.save()
seen_agendas.append(agenda.slug)
# now check outdated agendas
for slug, agenda in existing_agendas.items():
if slug not in seen_agendas:
agenda.delete()

View File

@ -0,0 +1,21 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('agendas', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='agenda',
name='category_label',
field=models.CharField(null=True, max_length=150, verbose_name='Category label'),
),
migrations.AddField(
model_name='agenda',
name='category_slug',
field=models.SlugField(null=True, max_length=160, verbose_name='Category identifier'),
),
]

View File

@ -26,6 +26,8 @@ from lingo.utils.misc import AgendaImportError, clean_import_data, generate_slug
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
category_label = models.CharField(_('Category label'), max_length=150, null=True)
category_slug = models.SlugField(_('Category identifier'), max_length=160, null=True)
def __str__(self):
return self.label

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.shortcuts import get_object_or_404
from django.urls import reverse
from .models import Agenda
@ -34,6 +35,12 @@ class AgendaMixin:
context['agenda'] = self.agenda
return context
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
if not kwargs.get('instance'):
kwargs['instance'] = self.model()
kwargs['instance'].agenda = self.agenda
return kwargs
def get_success_url(self):
# XXX: return reverse('lingo-manager-agenda-settings', kwargs={'pk': self.agenda.id})
return '/'
return reverse('lingo-manager-agenda-detail', kwargs={'pk': self.agenda.pk})

View File

@ -1,3 +1,13 @@
li span.identifier {
font-size: 80%;
opacity: 0.6;
}
h2 span.identifier {
font-size: 1rem;
opacity: 0.6;
}
div.paragraph {
background: white;
box-sizing: border-box;

View File

@ -1 +0,0 @@
{% extends "lingo/manager_agenda_view.html" %}

View File

@ -1 +0,0 @@
{% extends "lingo/manager_base.html" %}

View File

@ -0,0 +1,38 @@
{% extends "lingo/pricing/manager_agenda_list.html" %}
{% load i18n %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'lingo-manager-agenda-detail' agenda.pk %}">{{ agenda }}</a>
{% endblock %}
{% block appbar %}
<h2>{{ agenda }}
<span class="identifier">[{% trans "identifier:" %} {{ agenda.slug }}]</span>
</h2>
<span class="actions">
<a rel="popup" href="{% url 'lingo-manager-agenda-pricing-add' pk=agenda.pk %}">{% trans 'New pricing' %}</a>
</span>
{% endblock %}
{% block content %}
<div class="section">
<h3>{% trans "Pricing" context 'pricing' %}</h3>
<div>
{% if agenda_pricings %}
<ul class="objects-list single-links">
{% for agenda_pricing in agenda_pricings %}
<li><a href="{% url 'lingo-manager-agenda-pricing-detail' agenda.pk agenda_pricing.pk %}">{{ agenda_pricing.pricing }} ({{ agenda_pricing.date_start|date:'d/m/Y' }} - {{ agenda_pricing.date_end|date:'d/m/Y' }})</a></li>
{% endfor %}
</ul>
{% else %}
<div class="big-msg-info">
{% blocktrans %}
This agenda doesn't have any pricing defined yet. Click on the "New pricing" button in
the top right of the page to add a first one.
{% endblocktrans %}
</div>
{% endif %}
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,39 @@
{% extends "lingo/pricing/manager_pricing_list.html" %}
{% load i18n %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'lingo-manager-agenda-list' %}">{% trans 'Agendas' %}</a>
{% endblock %}
{% block appbar %}
<h2>{% trans 'Agendas' %}</h2>
<span class="actions">
<a href="{% url 'lingo-manager-agenda-sync' %}">{% trans 'Refresh agendas' %}</a>
</span>
{% endblock %}
{% block content %}
{% if object_list %}
{% regroup object_list by category_label as agenda_groups %}
{% for group in agenda_groups %}
<div class="section">
{% if group.grouper %}<h3>{{ group.grouper }}</h3>{% elif not forloop.first %}<h3>{% trans "Misc" %}</h3>{% endif %}
<ul class="objects-list single-links">
{% for object in group.list %}
<li>
<a href="{% url 'lingo-manager-agenda-detail' object.pk %}">{{ object.label }} <span class="identifier">[{% trans "identifier:" %} {{ object.slug }}]</a>
</li>
{% endfor %}
</ul>
</div>
{% endfor %}
{% else %}
<div class="big-msg-info">
{% blocktrans %}
This site doesn't have any agenda yet. Click on the "Refresh agendas" button in the top
right of the page to synchronize them.
{% endblocktrans %}
</div>
{% endif %}
{% endblock %}

View File

@ -1,4 +1,4 @@
{% extends "lingo/manager_agenda_settings.html" %}
{% extends "lingo/pricing/manager_agenda_detail.html" %}
{% load i18n %}
{% block breadcrumb %}

View File

@ -1,4 +1,4 @@
{% extends "lingo/manager_agenda_settings.html" %}
{% extends "lingo/pricing/manager_agenda_detail.html" %}
{% load i18n %}
{% block breadcrumb %}
@ -28,7 +28,7 @@
{% if object.pk %}
<a class="cancel" href="{% url 'lingo-manager-agenda-pricing-detail' agenda.pk object.pk %}">{% trans 'Cancel' %}</a>
{% else %}
<a class="cancel" href="{% url 'lingo-manager-agenda-settings' agenda.pk %}">{% trans 'Cancel' %}</a>
<a class="cancel" href="{% url 'lingo-manager-agenda-detail' agenda.pk %}">{% trans 'Cancel' %}</a>
{% endif %}
</div>
</form>

View File

@ -14,6 +14,7 @@
<li><a rel="popup" href="{% url 'lingo-manager-pricing-config-import' %}">{% trans 'Import' %}</a></li>
<li><a rel="popup" href="{% url 'lingo-manager-pricing-config-export' %}" data-autoclose-dialog="true">{% trans 'Export' %}</a></li>
</ul>
<a href="{% url 'lingo-manager-agenda-list' %}">{% trans 'Agendas' %}</a>
<a href="{% url 'lingo-manager-pricing-criteria-list' %}">{% trans 'Criterias' %}</a>
<a rel="popup" href="{% url 'lingo-manager-pricing-add' %}">{% trans 'New pricing model' %}</a>
</span>

View File

@ -123,6 +123,21 @@ urlpatterns = [
staff_member_required(views.criteria_delete),
name='lingo-manager-pricing-criteria-delete',
),
url(
r'^agendas/sync/$',
staff_member_required(views.agenda_sync),
name='lingo-manager-agenda-sync',
),
url(
r'^agendas/$',
staff_member_required(views.agenda_list),
name='lingo-manager-agenda-list',
),
url(
r'^agenda/(?P<pk>\d+)/$',
staff_member_required(views.agenda_detail),
name='lingo-manager-agenda-detail',
),
url(
r'^agenda/(?P<pk>\d+)/pricing/add/$',
staff_member_required(views.agenda_pricing_add),

View File

@ -28,9 +28,18 @@ from django.urls import reverse, reverse_lazy
from django.utils.encoding import force_text
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext
from django.views.generic import CreateView, DeleteView, DetailView, FormView, ListView, UpdateView
from django.views.generic import (
CreateView,
DeleteView,
DetailView,
FormView,
ListView,
RedirectView,
UpdateView,
)
from django.views.generic.detail import SingleObjectMixin
from lingo.agendas.chrono import refresh_agendas
from lingo.agendas.models import Agenda
from lingo.agendas.views import AgendaMixin
from lingo.pricing.forms import (
@ -569,6 +578,47 @@ class CriteriaDeleteView(DeleteView):
criteria_delete = CriteriaDeleteView.as_view()
class AgendaListView(ListView):
template_name = 'lingo/pricing/manager_agenda_list.html'
model = Agenda
def get_queryset(self):
queryset = super().get_queryset()
return queryset.order_by('category_label', 'label')
agenda_list = AgendaListView.as_view()
class AgendaSyncView(RedirectView):
def get(self, request, *args, **kwargs):
refresh_agendas()
messages.info(self.request, _('Agendas refreshed.'))
return super().get(request, *args, **kwargs)
def get_redirect_url(self, *args, **kwargs):
return reverse('lingo-manager-agenda-list')
agenda_sync = AgendaSyncView.as_view()
class AgendaDetailView(AgendaMixin, DetailView):
template_name = 'lingo/pricing/manager_agenda_detail.html'
model = Agenda
def get_context_data(self, **kwargs):
kwargs['agenda_pricings'] = (
AgendaPricing.objects.filter(agenda=self.agenda)
.select_related('pricing')
.order_by('date_start', 'date_end')
)
return super().get_context_data(**kwargs)
agenda_detail = AgendaDetailView.as_view()
class AgendaPricingAddView(AgendaMixin, CreateView):
template_name = 'lingo/pricing/manager_agenda_pricing_form.html'
model = AgendaPricing

View File

@ -168,6 +168,13 @@ MELLON_USERNAME_TEMPLATE = '{attributes[name_id_content]}'
MELLON_IDENTITY_PROVIDERS = []
# proxies argument passed to all python-request methods
# (see http://docs.python-requests.org/en/master/user/advanced/#proxies)
REQUESTS_PROXIES = None
# timeout used in python-requests call, in seconds
# we use 28s by default: timeout just before web server, which is usually 30s
REQUESTS_TIMEOUT = 28
# default site
SITE_BASE_URL = 'http://localhost'

View File

@ -0,0 +1,18 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .requests_wrapper import requests

View File

@ -15,7 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import urllib.parse
from django.conf import settings
from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.utils.translation import ugettext_lazy as _
@ -69,3 +71,13 @@ def clean_import_data(cls, data):
except ValidationError:
raise AgendaImportError(_('Bad slug format "%s"') % value)
return cleaned_data
def get_known_service_for_url(url):
netloc = urllib.parse.urlparse(url).netloc
for services in settings.KNOWN_SERVICES.values():
for service in services.values():
remote_url = service.get('url')
if urllib.parse.urlparse(remote_url).netloc == netloc:
return service
return None

View File

@ -0,0 +1,155 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import logging
import urllib.parse
from io import BytesIO
from django.conf import settings
from django.core.cache import cache
from django.utils.encoding import smart_bytes
from django.utils.http import urlencode
from requests import Response
from requests import Session as RequestsSession
from requests.auth import AuthBase
from .misc import get_known_service_for_url
from .signature import sign_url
class NothingInCacheException(Exception):
pass
class PublikSignature(AuthBase):
def __init__(self, secret):
self.secret = secret
def __call__(self, request):
request.url = sign_url(request.url, self.secret)
return request
class Requests(RequestsSession):
def request(self, method, url, **kwargs):
remote_service = kwargs.pop('remote_service', None)
cache_duration = kwargs.pop('cache_duration', 15)
invalidate_cache = kwargs.pop('invalidate_cache', False)
user = kwargs.pop('user', None)
django_request = kwargs.pop('django_request', None)
without_user = kwargs.pop('without_user', False)
federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid'
raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
log_errors = kwargs.pop('log_errors', True)
# don't use persistent cookies
self.cookies.clear()
# search in legacy urls
legacy_urls_mapping = getattr(settings, 'LEGACY_URLS_MAPPING', None)
if legacy_urls_mapping:
splitted_url = urllib.parse.urlparse(url)
hostname = splitted_url.netloc
if hostname in legacy_urls_mapping:
url = splitted_url._replace(netloc=legacy_urls_mapping[hostname]).geturl()
if remote_service == 'auto':
remote_service = get_known_service_for_url(url)
if remote_service:
# only keeps the path (URI) in url parameter, scheme and netloc are
# in remote_service
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
else:
logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
if remote_service:
if isinstance(user, dict):
query_params = user.copy()
elif not user or not user.is_authenticated:
if without_user:
query_params = {}
else:
query_params = {'NameID': '', 'email': ''}
else:
query_params = {}
if federation_key == 'nameid':
query_params['NameID'] = user.get_name_id()
elif federation_key == 'email':
query_params['email'] = user.email
else: # 'auto'
user_name_id = user.get_name_id()
if user_name_id:
query_params['NameID'] = user_name_id
else:
query_params['email'] = user.email
if remote_service.get('orig'):
query_params['orig'] = remote_service.get('orig')
remote_service_base_url = remote_service.get('url')
scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(
remote_service_base_url
)
query = urlencode(query_params)
if '?' in url:
path, old_query = url.split('?', 1)
query += '&' + old_query
else:
path = url
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
if method == 'GET' and cache_duration:
# handle cache
params = urlencode(kwargs.get('params', {}))
cache_key = hashlib.md5(smart_bytes(url + params)).hexdigest()
cache_content = cache.get(cache_key)
if cache_content and not invalidate_cache:
response = Response()
response.status_code = 200
response.raw = BytesIO(smart_bytes(cache_content))
return response
elif raise_if_not_cached:
raise NothingInCacheException()
if remote_service: # sign
kwargs['auth'] = PublikSignature(remote_service.get('secret'))
kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
response = super().request(method, url, **kwargs)
if log_errors and (response.status_code // 100 != 2):
extra = {}
if django_request:
extra['request'] = django_request
if log_errors == 'warn':
logging.warning(
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
)
else:
logging.error(
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
)
if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
cache.set(cache_key, response.content, cache_duration)
return response
requests = Requests()

52
lingo/utils/signature.py Normal file
View File

@ -0,0 +1,52 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import hashlib
import hmac
import random
import urllib.parse
from django.utils.encoding import smart_bytes
from django.utils.http import quote, urlencode
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
return hash.digest()

View File

View File

@ -0,0 +1,153 @@
import json
from unittest import mock
import pytest
from requests.exceptions import ConnectionError
from requests.models import Response
from lingo.agendas.chrono import collect_agenda_data, refresh_agendas
from lingo.agendas.models import Agenda
pytestmark = pytest.mark.django_db
AGENDA_DATA = [
{
"slug": "events-a",
"kind": "events",
"text": "Events A",
"category": None,
"category_label": None,
},
{
"slug": "events-b",
"kind": "events",
"text": "Events B",
"category": "foo",
"category_label": "Foo",
},
{
"slug": "meetings-a",
"kind": "meetings",
"text": "Meetings A",
"category": None,
"category_label": None,
},
{
"slug": "virtual-b",
"kind": "virtual",
"text": "Virtual B",
"category": "foo",
"category_label": "Foo",
},
]
class MockedRequestResponse(mock.Mock):
status_code = 200
def json(self):
return json.loads(self.content)
def test_collect_agenda_data_no_service(settings):
settings.KNOWN_SERVICES = {}
assert collect_agenda_data() is None
settings.KNOWN_SERVICES = {'other': []}
assert collect_agenda_data() is None
def test_collect_agenda_data():
with mock.patch('requests.Session.get') as requests_get:
requests_get.side_effect = ConnectionError()
assert collect_agenda_data() is None
with mock.patch('requests.Session.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 500
requests_get.return_value = mock_resp
assert collect_agenda_data() is None
with mock.patch('requests.Session.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 404
requests_get.return_value = mock_resp
assert collect_agenda_data() is None
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
assert collect_agenda_data() is None
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert collect_agenda_data() == []
assert requests_get.call_args_list[0][0] == ('api/agenda/',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
data = {'data': AGENDA_DATA}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert collect_agenda_data() == [
{'category_label': None, 'category_slug': None, 'label': 'Events A', 'slug': 'events-a'},
{'category_label': 'Foo', 'category_slug': 'foo', 'label': 'Events B', 'slug': 'events-b'},
]
@mock.patch('lingo.agendas.chrono.collect_agenda_data')
def test_refresh_agendas(mock_collect):
Agenda.objects.create(label='foo')
# error during collect
mock_collect.return_value = None
refresh_agendas()
assert Agenda.objects.count() == 1 # no changes
# 2 agendas found
mock_collect.return_value = [
{'category_label': None, 'category_slug': None, 'label': 'Events A', 'slug': 'events-a'},
{'category_label': 'Foo', 'category_slug': 'foo', 'label': 'Events B', 'slug': 'events-b'},
]
# agendas don't exist, create them
refresh_agendas()
assert Agenda.objects.count() == 2
agenda1 = Agenda.objects.all().order_by('pk')[0]
agenda2 = Agenda.objects.all().order_by('pk')[1]
assert agenda1.label == "Events A"
assert agenda1.slug == "events-a"
assert agenda1.category_label is None
assert agenda1.category_slug is None
assert agenda2.label == "Events B"
assert agenda2.slug == "events-b"
assert agenda2.category_label == 'Foo'
assert agenda2.category_slug == 'foo'
# again, but some attributes are wrong
agenda1.label = "Wrong"
agenda1.category_label = 'Foo'
agenda1.category_slug = 'foo'
agenda1.save()
agenda2.label = "Wrong"
agenda2.category_label is None
agenda2.category_slug is None
agenda2.save()
refresh_agendas()
assert Agenda.objects.count() == 2
new_agenda1 = Agenda.objects.all().order_by('pk')[0]
new_agenda2 = Agenda.objects.all().order_by('pk')[1]
assert new_agenda1.pk == agenda1.pk
assert new_agenda1.label == "Events A"
assert new_agenda1.slug == "events-a"
assert new_agenda1.category_label is None
assert new_agenda1.category_slug is None
assert new_agenda2.pk == agenda2.pk
assert new_agenda2.label == "Events B"
assert new_agenda2.slug == "events-b"
assert new_agenda2.category_label == 'Foo'
assert new_agenda2.category_slug == 'foo'
# no agenda in chrono
mock_collect.return_value = []
refresh_agendas()
assert Agenda.objects.count() == 0

View File

@ -51,7 +51,6 @@ def test_import_export(app):
old_stdin = sys.stdin
sys.stdin = StringIO(json.dumps({}))
agenda = Agenda.objects.create(label='Foo Bar')
pricing = Pricing.objects.create(label='Foo')
AgendaPricing.objects.create(
agenda=agenda,

View File

@ -1,6 +1,7 @@
import copy
import datetime
import json
from unittest import mock
import pytest
from webtest import Upload
@ -12,7 +13,7 @@ from tests.utils import login
pytestmark = pytest.mark.django_db
def test_export_site(settings, freezer, app, admin_user):
def test_export_site(freezer, app, admin_user):
freezer.move_to('2020-06-15')
login(app)
resp = app.get('/manage/pricing/')
@ -48,7 +49,7 @@ def test_export_site(settings, freezer, app, admin_user):
assert 'pricing_models' not in site_json
def test_add_pricing(settings, app, admin_user):
def test_add_pricing(app, admin_user):
app = login(app)
resp = app.get('/manage/')
resp = resp.click('Pricing')
@ -425,7 +426,7 @@ def test_pricing_reorder_categories(app, admin_user):
]
def test_add_category(settings, app, admin_user):
def test_add_category(app, admin_user):
app = login(app)
resp = app.get('/manage/')
resp = resp.click('Pricing')
@ -685,13 +686,24 @@ def test_import_criteria_category(app, admin_user):
assert Criteria.objects.count() == 6
@pytest.mark.xfail(reason='agenda views not yet implemented')
def test_add_agenda_pricing(settings, app, admin_user):
@mock.patch('lingo.pricing.views.refresh_agendas')
def test_refresh_agendas(mock_refresh, app, admin_user):
app = login(app)
resp = app.get('/manage/pricing/agendas/')
resp = resp.click('Refresh agendas')
assert resp.location.endswith('/manage/pricing/agendas/')
resp = resp.follow()
assert "Agendas refreshed." in resp
assert mock_refresh.call_args_list == [mock.call()]
def test_add_agenda_pricing(app, admin_user):
agenda = Agenda.objects.create(label='Foo Bar')
pricing = Pricing.objects.create(label='Model')
app = login(app)
resp = app.get('/manage/agendas/%s/settings' % agenda.pk)
resp = app.get('/manage/pricing/agendas/')
resp = resp.click(href='/manage/pricing/agenda/%s/' % agenda.pk)
resp = resp.click('New pricing')
resp.form['pricing'] = pricing.pk
resp.form['date_start'] = '2021-09-01'
@ -799,11 +811,10 @@ def test_delete_agenda_pricing(app, admin_user):
resp = app.get('/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing.pk))
resp = resp.click(href='/manage/pricing/agenda/%s/pricing/%s/delete/' % (agenda.pk, agenda_pricing.pk))
resp = resp.form.submit()
# XXX: assert resp.location.endswith('/manage/agendas/%s/settings' % agenda.pk)
assert resp.location.endswith('/manage/pricing/agenda/%s/' % agenda.pk)
assert AgendaPricing.objects.filter(pk=agenda_pricing.pk).exists() is False
@pytest.mark.xfail(reason='agenda settings view not yet implemented')
def test_detail_agenda_pricing(app, admin_user):
agenda = Agenda.objects.create(label='Foo Bar')
pricing = Pricing.objects.create(label='Model')
@ -822,7 +833,7 @@ def test_detail_agenda_pricing(app, admin_user):
)
app = login(app)
resp = app.get('/manage/agendas/%s/settings' % agenda.pk)
resp = app.get('/manage/pricing/agenda/%s/' % agenda.pk)
resp = resp.click(href='/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing.pk))
app.get('/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing2.pk), status=404)
@ -1377,7 +1388,6 @@ def test_edit_agenda_pricing_matrix_1_category(app, admin_user):
)
@pytest.mark.xfail(reason='agenda views not yet implemented')
def test_edit_agenda_pricing_matrix_empty(app, admin_user):
agenda = Agenda.objects.create(label='Foo bar')
pricing = Pricing.objects.create(label='Foo bar')

View File

@ -18,3 +18,24 @@ DATABASES = {
},
}
}
KNOWN_SERVICES = {
'chrono': {
'default': {
'title': 'test',
'url': 'http://chrono.example.org',
'secret': 'lingo',
'orig': 'lingo',
'backoffice-menu-url': 'http://chrono.example.org/manage/',
'secondary': False,
},
'other': {
'title': 'other',
'url': 'http://other.chrono.example.org',
'secret': 'lingo',
'orig': 'lingo',
'backoffice-menu-url': 'http://other.chrono.example.org/manage/',
'secondary': True,
},
},
}