misc: remove newsletter app (#53541)
gitea-wip/combo/pipeline/head Build started... Details
gitea/combo/pipeline/head Build started... Details

This commit is contained in:
Lauréline Guérin 2021-04-29 14:48:45 +02:00
parent 2a151c45d5
commit 85d3f77a7f
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
16 changed files with 5 additions and 843 deletions

View File

@ -1,75 +0,0 @@
Combo newsletters cell
======================
This cell is enabled by default.
It expects a webservice returning newsletters and user subscriptions in the
following format:
[{'id': '1', 'text': 'Democratie locale',
'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '2', 'text': 'Rencontres de quartiers',
'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '3', 'text': 'Environnement',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '4', 'text': u'Marchés publics',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '5', 'text': "Offres d'emploi",
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '6', 'text': 'Infos créche',
'transports': [{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '7', 'text': 'Familles',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'}]},
{'id': '8', 'text': 'Travaux',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]}]
The url to the webservice should be provided in the instatiation form. The
fields **resources_restrictions** and **transports_restrictions** allow to
filter the newsletters by their name and transport means.
**resources_restrictions** field is a comma separated list of newsletters
slugs. For example: __rencontres-de-quartiers,infos-creche__.
In this case only the following newsletters will be exposed in the
subscriptions form:
[{'id': '2', 'text': 'Rencontres de quartiers',
'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '6', 'text': 'Infos créche',
'transports': [{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]}]
**transport_restrictions** field is a comma separated list of transport types.
Example: __sms,rss__
In this case only the newsletters containing one of these transports will be
shown:
[{'id': '3', 'text': 'Environnement',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '4', 'text': u'Marchés publics',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '5', 'text': "Offres d'emploi",
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '6', 'text': 'Infos créche',
'transports': [{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]},
{'id': '7', 'text': 'Familles',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'}]},
{'id': '8', 'text': 'Travaux',
'transports': [{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'}]}]

View File

@ -1,31 +0,0 @@
# combo - content management system
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import django.apps
from django.utils.translation import ugettext_lazy as _
class AppConfig(django.apps.AppConfig):
name = 'combo.apps.newsletters'
verbose_name = _('Newsletters')
def get_before_urls(self):
from . import urls
return urls.urlpatterns
default_app_config = 'combo.apps.newsletters.AppConfig'

View File

@ -1,83 +0,0 @@
# combo - content management system
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from django import forms
from django.utils.translation import ugettext_lazy as _
class NewslettersManageForm(forms.Form):
def __init__(self, *args, **kwargs):
logger = logging.getLogger(__name__)
self.request = kwargs.pop('request')
self.user = self.request.user
self.instance = kwargs.pop('instance')
self.themes = set()
super(NewslettersManageForm, self).__init__(*args, **kwargs)
# initialize cleane data in order to be able to add errors
self.cleaned_data = {}
try:
newsletters = self.instance.get_newsletters()
except Exception as e:
self.add_error(None, _('An error occured while getting newsletters. Please try later.'))
logger.error('Error occured while getting newsletters: %r', e)
return
self.params = {}
user_name_id = self.user.get_name_id()
if user_name_id:
self.params['uuid'] = user_name_id
# get mobile number from mellon session as it is not user attribute
if self.request.session.get('mellon_session'):
self.params['mobile'] = self.request.session['mellon_session'].get('mobile', '')
try:
subscriptions = self.instance.get_subscriptions(self.user, **self.params)
except Exception as e:
self.add_error(None, _('An error occured while getting subscriptions. Please try later.'))
logger.error('Error occured while getting subscriptions: %r', e)
return
for newsletter in newsletters:
if not self.instance.check_resource(newsletter['text']):
continue
choices = []
initial = []
for transport in newsletter['transports']:
if not self.instance.check_transport(transport['id']):
continue
self.themes.add((transport['id'], transport['text']))
choices.append((transport['id'], ''))
if transport in newsletter['transports']:
for subscription in subscriptions:
if subscription['id'] == newsletter['id']:
initial = [t['id'] for t in subscription['transports']]
self.fields[newsletter['id']] = forms.MultipleChoiceField(
label=newsletter['text'],
help_text=transport['id'],
choices=choices,
initial=initial,
widget=forms.CheckboxSelectMultiple(),
required=False,
)
def save(self):
self.full_clean()
subscriptions = []
for key, value in self.cleaned_data.items():
subscriptions.append({'id': key, 'transports': value})
self.instance.set_subscriptions(subscriptions, self.user, **self.params)

View File

@ -1,58 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('auth', '0001_initial'),
('data', '0013_parameterscell'),
]
operations = [
migrations.CreateModel(
name='NewslettersCell',
fields=[
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
('placeholder', models.CharField(max_length=20)),
('order', models.PositiveIntegerField()),
('slug', models.SlugField(verbose_name='Slug', blank=True)),
('public', models.BooleanField(default=True, verbose_name='Public')),
(
'restricted_to_unlogged',
models.BooleanField(default=False, verbose_name='Restrict to unlogged users'),
),
('title', models.CharField(max_length=128, verbose_name='Title')),
('url', models.URLField(max_length=128, verbose_name='Newsletters service url')),
(
'resources_restrictions',
models.CharField(
help_text='list of resources(themes) separated by commas',
max_length=1024,
verbose_name='resources restrictions',
blank=True,
),
),
(
'transports_restrictions',
models.CharField(
help_text='list of transports separated by commas',
max_length=1024,
verbose_name='transports restrictions',
blank=True,
),
),
('groups', models.ManyToManyField(to='auth.Group', verbose_name='Groups', blank=True)),
('page', models.ForeignKey(to='data.Page', on_delete=models.CASCADE)),
],
options={
'verbose_name': 'Newsletters',
},
bases=(models.Model,),
),
]

View File

@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('newsletters', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='newsletterscell',
name='extra_css_class',
field=models.CharField(max_length=100, verbose_name='Extra classes for CSS styling', blank=True),
),
]

View File

@ -1,23 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import datetime
from django.db import migrations, models
from django.utils.timezone import utc
class Migration(migrations.Migration):
dependencies = [
('newsletters', '0002_newsletterscell_extra_css_class'),
]
operations = [
migrations.AddField(
model_name='newsletterscell',
name='last_update_timestamp',
field=models.DateTimeField(default=datetime.datetime.now(utc), auto_now=True),
preserve_default=False,
),
]

View File

@ -1,165 +0,0 @@
# combo - content management system
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
from django.conf import settings
from django.db import models
from django.forms import models as model_forms
from django.template.defaultfilters import slugify
from django.utils.http import urlencode
from django.utils.translation import ugettext_lazy as _
from requests.exceptions import HTTPError, RequestException
from combo.data.library import register_cell_class
from combo.data.models import CellBase
from combo.utils import requests
from .forms import NewslettersManageForm
class SubscriptionsSaveError(Exception):
pass
@register_cell_class
class NewslettersCell(CellBase):
title = models.CharField(verbose_name=_('Title'), max_length=128)
url = models.URLField(verbose_name=_('Newsletters service url'), max_length=128)
resources_restrictions = models.CharField(
_('resources restrictions'),
blank=True,
max_length=1024,
help_text=_('list of resources(themes) separated by commas'),
)
transports_restrictions = models.CharField(
_('transports restrictions'),
blank=True,
max_length=1024,
help_text=_('list of transports separated by commas'),
)
template_name = 'newsletters/newsletters.html'
user_dependant = True
@classmethod
def is_enabled(cls):
return settings.NEWSLETTERS_CELL_ENABLED
class Meta:
verbose_name = _('Newsletters')
def get_default_form_class(self):
model_fields = ('title', 'url', 'resources_restrictions', 'transports_restrictions')
return model_forms.modelform_factory(self.__class__, fields=model_fields)
def simplify(self, name):
return slugify(name.strip())
def get_resources_restrictions(self):
return list(filter(None, map(self.simplify, self.resources_restrictions.strip().split(','))))
def get_transports_restrictions(self):
return list(filter(None, map(self.simplify, self.transports_restrictions.strip().split(','))))
def check_resource(self, resource):
restrictions = self.get_resources_restrictions()
if restrictions and self.simplify(resource) not in restrictions:
return False
return True
def check_transport(self, transport):
restrictions = self.get_transports_restrictions()
if restrictions and transport not in restrictions:
return False
return True
def filter_data(self, data):
filtered = []
for item in data:
if not self.check_resource(item['text']):
continue
for t in item['transports']:
if self.check_transport(t['id']):
filtered.append(item)
return filtered
def get_newsletters(self):
endpoint = self.url + 'newsletters/'
try:
response = requests.get(endpoint, remote_service='auto', cache_duration=60, without_user=True)
response.raise_for_status()
except RequestException:
return []
json_response = response.json()
return self.filter_data(json_response['data'])
def get_subscriptions(self, user, **kwargs):
endpoint = self.url + 'subscriptions/'
try:
response = requests.get(
endpoint, remote_service='auto', user=user, cache_duration=0, params=kwargs
)
response.raise_for_status()
except RequestException:
return []
json_response = response.json()
return self.filter_data(json_response['data'])
def set_subscriptions(self, subscriptions, user, **kwargs):
logger = logging.getLogger(__name__)
# uuid is mandatory to store subscriptions
if 'uuid' not in kwargs:
raise SubscriptionsSaveError
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
endpoint = self.url + 'subscriptions/'
try:
response = requests.post(
endpoint,
remote_service='auto',
data=json.dumps(subscriptions),
user=user,
federation_key='email',
params=kwargs,
headers=headers,
)
response.raise_for_status()
if not response.json()['data']:
raise SubscriptionsSaveError
except HTTPError as e:
logger.error(
u'set subscriptions on %s returned an HTTP error code: %s',
e.response.request.url,
e.response.status_code,
)
raise SubscriptionsSaveError
except RequestException as e:
logger.error(u'set subscriptions on %s failed with exception: %s', endpoint, e)
raise SubscriptionsSaveError
def render(self, context):
user = context.get('user')
if user and user.is_authenticated:
form = NewslettersManageForm(instance=self, request=context['request'])
context['form'] = form
return super(NewslettersCell, self).render(context)
def is_visible(self, **kwargs):
user = kwargs.get('user')
if user is None or not user.is_authenticated:
return False
return super(NewslettersCell, self).is_visible(**kwargs)

View File

@ -1,43 +0,0 @@
{% load i18n %}
<h2>{{ cell.title }}</h2>
{% if form %}
{% if form.non_field_errors %}
{{ form.non_field_errors }}
{% else %}
<form method="post" action={% url 'newsletters-update' pk=cell.pk %}>
{% csrf_token %}
{% for field in form %}
{{ field.error }}
{% endfor %}
<table class="newsletters-form">
<thead>
<tr>
<td>{% trans "Theme" %}</td>
{% for id, theme in form.themes %}
<td data-id="{{ id }}">{{ theme }}</td>
{% endfor %}
</tr>
</thead>
<tbody>
{% for field in form %}
<tr>
<td>{{ field.label }}</td>
{% for id, theme in form.themes %}<td data-id="{{ id }}">
{% for w in field %}
{% with choice_value=w.data.value choice_value18=w.choice_value %}
{% if choice_value == id or choice_value18 == id %}{{ w }}{% endif %}
{% endwith %}
{% endfor %}
</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
<button>{% trans "Modify" %}</button>
</form>
{% endif %}
{% else %}
<div>{% trans "No subscriptions" %}</div>
{% endif %}

View File

@ -1,12 +0,0 @@
from django.conf.urls import url
from django.contrib.auth.decorators import login_required
from .views import NewslettersView
urlpatterns = [
url(
r'^newsletters/(?P<pk>\w+)/update$',
login_required(NewslettersView.as_view()),
name='newsletters-update',
),
]

View File

@ -1,51 +0,0 @@
# combo - content management system
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import messages
from django.http import HttpResponseRedirect
from django.utils.translation import ugettext_lazy as _
from django.views.generic import FormView
from .forms import NewslettersManageForm
from .models import NewslettersCell, SubscriptionsSaveError
class NewslettersView(FormView):
http_method_names = ['post']
form_class = NewslettersManageForm
def form_valid(self, form):
try:
form.save()
messages.info(self.request, _('Your subscriptions are successfully saved'))
except SubscriptionsSaveError:
messages.error(
self.request, _('An error occured while saving your subscriptions. Please try later.')
)
return super(NewslettersView, self).form_valid(form)
def get_form_kwargs(self):
kwargs = super(NewslettersView, self).get_form_kwargs()
self.instance = NewslettersCell.objects.get(pk=self.kwargs['pk'])
kwargs.update({'request': self.request, 'instance': self.instance})
return kwargs
def form_invalid(self, form):
messages.error(self.request, _('An error occured while saving your subscriptions. Please try later.'))
return HttpResponseRedirect(self.get_success_url())
def get_success_url(self):
return self.instance.page.get_online_url()

View File

@ -70,7 +70,6 @@ INSTALLED_APPS = (
'combo.apps.family',
'combo.apps.dataviz',
'combo.apps.lingo',
'combo.apps.newsletters',
'combo.apps.fargo',
'combo.apps.notifications',
'combo.apps.search',
@ -363,7 +362,6 @@ PWA_NOTIFICATION_ICON_URL = None
# hide work-in-progress/experimental/broken/legacy/whatever cells for now
BOOKING_CALENDAR_CELL_ENABLED = False
LEGACY_CHART_CELL_ENABLED = False
NEWSLETTERS_CELL_ENABLED = False
def debug_show_toolbar(request):

View File

@ -82,7 +82,6 @@ FAMILY_SERVICE = {'root': '/'}
BOOKING_CALENDAR_CELL_ENABLED = True
LEGACY_CHART_CELL_ENABLED = True
NEWSLETTERS_CELL_ENABLED = True
USER_PROFILE_CONFIG = {
'fields': [

View File

@ -862,7 +862,7 @@ def test_site_export_import_json(app, admin_user):
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit()
assert len(ctx.captured_queries) in [303, 304]
assert len(ctx.captured_queries) in [298, 299]
assert Page.objects.count() == 4
assert PageSnapshot.objects.all().count() == 4
@ -873,7 +873,7 @@ def test_site_export_import_json(app, admin_user):
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit()
assert len(ctx.captured_queries) == 272
assert len(ctx.captured_queries) == 268
assert set(Page.objects.get(slug='one').related_cells['cell_types']) == set(
['data_textcell', 'data_linkcell']
)
@ -2178,7 +2178,7 @@ def test_page_versionning(app, admin_user):
with CaptureQueriesContext(connection) as ctx:
resp2 = resp.click('view', index=1)
assert len(ctx.captured_queries) == 71
assert len(ctx.captured_queries) == 70
assert Page.snapshots.latest('pk').related_cells == {'cell_types': ['data_textcell']}
assert resp2.text.index('Hello world') < resp2.text.index('Foobar3')
@ -2239,7 +2239,7 @@ def test_page_versionning(app, admin_user):
resp = resp.click('restore', index=6)
with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit().follow()
assert len(ctx.captured_queries) == 144
assert len(ctx.captured_queries) == 142
resp2 = resp.click('See online')
assert resp2.text.index('Foobar1') < resp2.text.index('Foobar2') < resp2.text.index('Foobar3')

View File

@ -1,275 +0,0 @@
# -*- coding: utf-8 -*-
import json
import mock
import pytest
import requests
from django.contrib.auth.models import User
from django.template.defaultfilters import slugify
from combo.apps.newsletters.forms import NewslettersManageForm
from combo.apps.newsletters.models import NewslettersCell, SubscriptionsSaveError
from combo.data.models import Page
from combo.utils import check_query
pytestmark = pytest.mark.django_db
NEWSLETTERS = [
{'id': '1', 'text': 'Democratie locale', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '2', 'text': 'Rencontres de quartiers', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{
'id': '3',
'text': 'Environnement',
'transports': [
{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'},
],
},
{
'id': '4',
'text': u'Marchés publics',
'transports': [{'id': 'mail', 'text': 'mail'}, {'id': 'rss', 'text': 'rss'}],
},
{
'id': '5',
'text': "Offres d'emploi",
'transports': [{'id': 'mail', 'text': 'mail'}, {'id': 'rss', 'text': 'rss'}],
},
{
'id': '6',
'text': 'Infos créche',
'transports': [{'id': 'sms', 'text': 'sms'}, {'id': 'rss', 'text': 'rss'}],
},
{
'id': '7',
'text': 'Familles',
'transports': [{'id': 'mail', 'text': 'mail'}, {'id': 'sms', 'text': 'sms'}],
},
{
'id': '8',
'text': 'Travaux',
'transports': [
{'id': 'mail', 'text': 'mail'},
{'id': 'sms', 'text': 'sms'},
{'id': 'rss', 'text': 'rss'},
],
},
]
SUBSCRIPTIONS = [
{'id': '3', 'text': 'Environnement', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '7', 'text': 'Familles', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '5', 'text': "Offres d'emploi", 'transports': [{'id': 'mail', 'text': 'mail'}]},
{
'id': '6',
'text': 'Infos créche',
'transports': [{'id': 'sms', 'text': 'sms'}, {'id': 'rss', 'text': 'rss'}],
},
]
USER_EMAIL = 'foobar@example.com'
@pytest.fixture
def cell():
page = Page()
page.save()
cell = NewslettersCell(title='Newsletter test', url='http://example.org/', page=page, order=0)
cell.save()
return cell
@pytest.fixture
def user():
try:
user = User.objects.get(username='foo')
except User.DoesNotExist:
user = User.objects.create(username='foo', email=USER_EMAIL)
return user
@mock.patch('combo.apps.newsletters.models.requests.get')
def test_get_newsletters_by_transports(mock_get, cell):
restrictions = ('mail', 'sms')
cell.transports_restrictions = ','.join(restrictions)
expected_newsletters = []
for n in NEWSLETTERS:
for t in n['transports']:
if t['id'] in restrictions:
expected_newsletters.append(n)
continue
mock_json = mock.Mock()
mock_json.json.return_value = {'err': 0, 'data': NEWSLETTERS}
mock_get.return_value = mock_json
assert cell.get_newsletters() == expected_newsletters
assert mock_get.call_args[1]['without_user']
assert 'user' not in mock_get.call_args[1]
mock_get.side_effect = requests.RequestException
assert cell.get_newsletters() == []
@mock.patch('combo.apps.newsletters.models.requests.get')
def test_get_newsletters_by_unrestricted_transports(mock_get, cell):
cell.transports_restrictions = ''
expected_newsletters = []
for n in NEWSLETTERS:
for t in n['transports']:
expected_newsletters.append(n)
mock_json = mock.Mock()
mock_json.json.return_value = {'err': 0, 'data': NEWSLETTERS}
mock_get.return_value = mock_json
assert cell.get_newsletters() == expected_newsletters
@mock.patch('combo.apps.newsletters.models.requests.get')
def test_get_newsletters_by_resources(mock_get, cell):
restrictions = ('marches-publics', 'familles', 'democratie-locale')
cell.transports_restrictions = 'mail'
cell.resources_restrictions = ','.join(restrictions)
expected_newsletters = []
for n in NEWSLETTERS:
if slugify(n['text']) in restrictions:
expected_newsletters.append(n)
mock_json = mock.Mock()
mock_json.json.return_value = {'err': 0, 'data': NEWSLETTERS}
mock_get.return_value = mock_json
assert cell.get_newsletters() == expected_newsletters
@mock.patch('combo.apps.newsletters.models.requests.get')
def test_get_subscriptions(mock_get, cell, user):
restrictions = ('mail', 'sms')
cell.transports_restrictions = ','.join(restrictions)
expected_subscriptions = []
for n in SUBSCRIPTIONS:
for t in n['transports']:
if t['id'] in restrictions:
expected_subscriptions.append(n)
continue
mock_json = mock.Mock()
mock_json.json.return_value = {'err': 0, 'data': SUBSCRIPTIONS}
mock_get.return_value = mock_json
assert cell.get_subscriptions(user) == expected_subscriptions
assert mock_get.call_args[1]['user'].email == USER_EMAIL
mock_get.side_effect = requests.RequestException
assert cell.get_subscriptions(user) == []
@mock.patch('combo.utils.requests_wrapper.RequestsSession.send')
def test_get_subscriptions_signature_check(mock_send, cell, user):
restrictions = ('mail', 'sms')
cell.transports_restrictions = ','.join(restrictions)
expected_subscriptions = []
for n in SUBSCRIPTIONS:
for t in n['transports']:
if t['id'] in restrictions:
expected_subscriptions.append(n)
continue
mock_json = mock.Mock(status_code=200)
mock_json.json.return_value = {'err': 0, 'data': SUBSCRIPTIONS}
mock_send.return_value = mock_json
cell.get_subscriptions(user)
url = mock_send.call_args[0][0].url
assert check_query(url.split('?', 1)[-1], 'combo')
@mock.patch('combo.apps.newsletters.models.requests.post')
def test_failed_set_subscriptions(mock_post, cell, user):
restrictions = ('sms', 'mail')
cell.transports_restrictions = ','.join(restrictions)
subscriptions = [
{'id': '1', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '7', 'transports': [{'id': 'sms', 'text': 'sms'}]},
{'id': '8', 'transports': [{'id': 'sms', 'text': 'sms'}, {'id': 'mail', 'text': 'mail'}]},
]
mock_post.side_effect = requests.ConnectionError
with pytest.raises(SubscriptionsSaveError):
cell.set_subscriptions(subscriptions, user, uuid='useruuid')
mock_post.side_effect = requests.HTTPError(response=mock.MagicMock())
with pytest.raises(SubscriptionsSaveError):
cell.set_subscriptions(subscriptions, user, uuid='useruuid')
def test_set_subscriptions_with_no_uuid(cell, user):
restrictions = ('sms', 'mail')
cell.transports_restrictions = ','.join(restrictions)
subscriptions = [
{'id': '1', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '8', 'transports': [{'id': 'sms', 'text': 'sms'}, {'id': 'mail', 'text': 'mail'}]},
]
with pytest.raises(SubscriptionsSaveError):
cell.set_subscriptions(subscriptions, user)
@mock.patch('combo.apps.newsletters.models.requests.post')
def test_set_subscriptions(mock_post, cell, user):
restrictions = ('sms', 'mail')
cell.transports_restrictions = ','.join(restrictions)
subscriptions = [
{'id': '1', 'transports': [{'id': 'mail', 'text': 'mail'}]},
{'id': '7', 'transports': [{'id': 'sms', 'text': 'sms'}]},
{'id': '8', 'transports': [{'id': 'sms', 'text': 'sms'}, {'id': 'mail', 'text': 'mail'}]},
]
mock_json = mock.Mock()
mock_json.json.return_value = {'err': 0, 'data': True}
mock_post.return_value = mock_json
cell.set_subscriptions(subscriptions, user, uuid='useruuid')
args, kwargs = mock_post.call_args
assert kwargs['params']['uuid'] == 'useruuid'
assert kwargs['federation_key'] == 'email'
assert kwargs['user'].email == USER_EMAIL
@mock.patch('combo.apps.newsletters.models.requests.get')
def test_get_subscriptions_with_name_id_and_mobile(mock_get, cell, user):
restrictions = ('sms', 'mail')
cell.transports_restrictions = ','.join(restrictions)
mock_json = mock.Mock()
mock_json.json.return_value = {'err': 0, 'data': NEWSLETTERS}
mock_get.return_value = mock_json
fake_saml_request = mock.Mock()
fake_saml_request.user = mock.Mock(email=USER_EMAIL)
fake_saml_request.user.get_name_id.return_value = 'nameid'
fake_saml_request.session = {'mellon_session': {'mobile': '0607080900'}}
form = NewslettersManageForm(instance=cell, request=fake_saml_request)
args, kwargs = mock_get.call_args
assert kwargs['params'] == {'uuid': 'nameid', 'mobile': '0607080900'}
def mocked_requests_get(*args, **kwargs):
url = args[0]
class MockResponse(mock.Mock):
status_code = 200
def json(self):
return json.loads(self.content)
if 'newsletters' in url:
return MockResponse(content=json.dumps({'data': NEWSLETTERS}))
else:
return MockResponse(content=json.dumps({'data': SUBSCRIPTIONS}))
@mock.patch('combo.apps.newsletters.models.requests.get', side_effect=mocked_requests_get)
def test_subscriptions_form(mock_get, cell, user):
restrictions = ('sms', 'mail')
cell.transports_restrictions = ','.join(restrictions)
newsletters = [n['id'] for n in cell.get_newsletters()]
assert mock_get.call_args[1]['without_user']
subscriptions = [s['id'] for s in cell.get_subscriptions(user)]
fake_request = mock.Mock(user=mock.Mock(username='username', email=USER_EMAIL), session={})
form = NewslettersManageForm(instance=cell, request=fake_request)
# test if all newsletters are present
for f_id, field in form.fields.items():
assert f_id in newsletters
# test if initial fields are in the restrictions
for s in subscriptions:
assert set(form.fields[s].initial).intersection(set(restrictions))

View File

@ -1364,7 +1364,7 @@ def test_index_site_num_queries(settings, app):
assert IndexedCell.objects.count() == 50
with CaptureQueriesContext(connection) as ctx:
index_site()
assert len(ctx.captured_queries) == 224
assert len(ctx.captured_queries) == 223
SearchCell.objects.create(
page=page, placeholder='content', order=0, _search_services={'data': ['search1']}