sms: filter authorized numbers (#39650)

This commit is contained in:
Nicolas Roche 2021-02-02 12:20:55 +01:00
parent fa748c98cc
commit 93071920da
10 changed files with 538 additions and 3 deletions

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2021-02-02 12:04
from __future__ import unicode_literals
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('choosit', '0009_choositsmsgateway_max_message_length'),
]
operations = [
migrations.AddField(
model_name='choositsmsgateway',
name='allow_premium_rate',
field=models.BooleanField(
default=False,
help_text='This option is only applyed to France mainland',
verbose_name='Allow premium rate numbers',
),
),
migrations.AddField(
model_name='choositsmsgateway',
name='authorized',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[
('fr-metro', 'France mainland (+33 [67])'),
('fr-domtom', 'France DOM/TOM (+262, etc.)'),
('be', 'Belgian (+32 4[5-9]) '),
('all', 'All'),
],
max_length=32,
null=True,
),
default=['all'],
size=None,
verbose_name='Authorized Countries',
),
),
]

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2021-02-02 12:04
from __future__ import unicode_literals
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mobyt', '0008_auto_20200310_1539'),
]
operations = [
migrations.AddField(
model_name='mobytsmsgateway',
name='allow_premium_rate',
field=models.BooleanField(
default=False,
help_text='This option is only applyed to France mainland',
verbose_name='Allow premium rate numbers',
),
),
migrations.AddField(
model_name='mobytsmsgateway',
name='authorized',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[
('fr-metro', 'France mainland (+33 [67])'),
('fr-domtom', 'France DOM/TOM (+262, etc.)'),
('be', 'Belgian (+32 4[5-9]) '),
('all', 'All'),
],
max_length=32,
null=True,
),
default=['all'],
size=None,
verbose_name='Authorized Countries',
),
),
]

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2021-02-02 12:04
from __future__ import unicode_literals
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('orange', '0008_auto_20200412_1240'),
]
operations = [
migrations.AddField(
model_name='orangesmsgateway',
name='allow_premium_rate',
field=models.BooleanField(
default=False,
help_text='This option is only applyed to France mainland',
verbose_name='Allow premium rate numbers',
),
),
migrations.AddField(
model_name='orangesmsgateway',
name='authorized',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[
('fr-metro', 'France mainland (+33 [67])'),
('fr-domtom', 'France DOM/TOM (+262, etc.)'),
('be', 'Belgian (+32 4[5-9]) '),
('all', 'All'),
],
max_length=32,
null=True,
),
default=['all'],
size=None,
verbose_name='Authorized Countries',
),
),
]

View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2021-02-02 12:04
from __future__ import unicode_literals
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ovh', '0012_auto_20201027_1121'),
]
operations = [
migrations.AddField(
model_name='ovhsmsgateway',
name='allow_premium_rate',
field=models.BooleanField(
default=False,
help_text='This option is only applyed to France mainland',
verbose_name='Allow premium rate numbers',
),
),
migrations.AddField(
model_name='ovhsmsgateway',
name='authorized',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[
('fr-metro', 'France mainland (+33 [67])'),
('fr-domtom', 'France DOM/TOM (+262, etc.)'),
('be', 'Belgian (+32 4[5-9]) '),
('all', 'All'),
],
max_length=32,
null=True,
),
default=['all'],
size=None,
verbose_name='Authorized Countries',
),
),
migrations.AlterField(
model_name='ovhsmsgateway',
name='alert_emails',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.EmailField(blank=True, max_length=254),
blank=True,
null=True,
size=None,
verbose_name='Email addresses list to send credit alerts to, separated by comma',
),
),
]

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2021-02-02 12:04
from __future__ import unicode_literals
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oxyd', '0008_oxydsmsgateway_max_message_length'),
]
operations = [
migrations.AddField(
model_name='oxydsmsgateway',
name='allow_premium_rate',
field=models.BooleanField(
default=False,
help_text='This option is only applyed to France mainland',
verbose_name='Allow premium rate numbers',
),
),
migrations.AddField(
model_name='oxydsmsgateway',
name='authorized',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[
('fr-metro', 'France mainland (+33 [67])'),
('fr-domtom', 'France DOM/TOM (+262, etc.)'),
('be', 'Belgian (+32 4[5-9]) '),
('all', 'All'),
],
max_length=32,
null=True,
),
default=['all'],
size=None,
verbose_name='Authorized Countries',
),
),
]

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2021-02-02 12:04
from __future__ import unicode_literals
import django.contrib.postgres.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('twilio', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='twiliosmsgateway',
name='allow_premium_rate',
field=models.BooleanField(
default=False,
help_text='This option is only applyed to France mainland',
verbose_name='Allow premium rate numbers',
),
),
migrations.AddField(
model_name='twiliosmsgateway',
name='authorized',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[
('fr-metro', 'France mainland (+33 [67])'),
('fr-domtom', 'France DOM/TOM (+262, etc.)'),
('be', 'Belgian (+32 4[5-9]) '),
('all', 'All'),
],
max_length=32,
null=True,
),
default=['all'],
size=None,
verbose_name='Authorized Countries',
),
),
]

View File

@ -1,8 +1,38 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.utils.translation import ugettext_lazy as _
from passerelle.forms import GenericConnectorForm
class SmsTestSendForm(forms.Form):
number = forms.CharField(label=_('To'), max_length=12)
sender = forms.CharField(label=_('From'), max_length=12)
message = forms.CharField(label=_('Message'), max_length=128)
class SMSConnectorForm(GenericConnectorForm):
def __init__(self, *args, **kwargs):
from passerelle.sms.models import SMSResource
super(SMSConnectorForm, self).__init__(*args, **kwargs)
self.fields['authorized'] = forms.MultipleChoiceField(
choices=SMSResource.AUTHORIZED,
widget=forms.CheckboxSelectMultiple,
initial=[SMSResource.ALL],
label=_('Authorized Countries'),
)

View File

@ -16,6 +16,7 @@
import logging
import re
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.utils import six
from django.utils.module_loading import import_string
@ -23,9 +24,11 @@ from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.compat import json_loads
from passerelle.sms.forms import SMSConnectorForm
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
SEND_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
"type": "object",
@ -49,6 +52,7 @@ SEND_SCHEMA = {
class SMSResource(BaseResource):
manager_form_base_class = SMSConnectorForm
category = _('SMS Providers')
documentation_url = (
'https://doc-publik.entrouvert.com/admin-fonctionnel/les-tutos/configuration-envoi-sms/'
@ -62,15 +66,47 @@ class SMSResource(BaseResource):
default_trunk_prefix = models.CharField(
verbose_name=_('Default trunk prefix'), max_length=2, default=u'0'
) # Yeah France first !
# FIXME: add regexp field, to check destination and from format
max_message_length = models.IntegerField(_('Maximum message length'), default=160)
manager_view_template_name = 'passerelle/manage/messages_service_view.html'
FR_METRO = 'fr-metro'
FR_DOMTOM = 'fr-domtom'
BE_ = 'be'
ALL = 'all'
AUTHORIZED = [
(FR_METRO, _('France mainland (+33 [67])')),
(FR_DOMTOM, _('France DOM/TOM (+262, etc.)')),
(BE_, _('Belgian (+32 4[5-9]) ')),
(ALL, _('All')),
]
authorized = ArrayField(
models.CharField(max_length=32, null=True, choices=AUTHORIZED),
verbose_name=_('Authorized Countries'),
default=[ALL],
)
allow_premium_rate = models.BooleanField(
_('Allow premium rate numbers'),
default=False,
help_text=_('This option is only applyed to France mainland'),
)
@classmethod
def get_management_urls(cls):
return import_string('passerelle.sms.urls.management_urlpatterns')
def _get_authorized_display(self):
result = []
for key, value in self.AUTHORIZED:
if key in self.authorized:
result.append(str(value))
return ', '.join(result)
def __init__(self, *args, **kwargs):
super(SMSResource, self).__init__(*args, **kwargs)
self.get_authorized_display = self._get_authorized_display
def clean_numbers(self, destinations):
numbers = []
for dest in destinations:
@ -93,6 +129,50 @@ class SMSResource(BaseResource):
numbers.append(number)
return numbers
def authorize_numbers(self, destinations):
number_regexes = {
'premium_rate': [r'^0033[8]\d{8}$'],
SMSResource.FR_METRO: [r'^0033[67]\d{8}$'],
SMSResource.FR_DOMTOM: [
r'^00262262\d{6}$', # Réunion, Mayotte, Terres australe/antarctiques
r'^508508\d{6}$', # Saint-Pierre-et-Miquelon
r'^590590\d{6}$', # Guadeloupe, Saint-Barthélemy, Saint-Martin
r'^594594\d{6}$', # Guyane
r'^596596\d{6}$', # Martinique
r'^00687[67]\d{8}$', # Nouvelle-Calédonie
],
SMSResource.BE_: [r'^00324[5-9]\d{7}$'],
}
premium_numbers = set()
if not self.allow_premium_rate:
regex = re.compile('|'.join(number_regexes['premium_rate']))
premium_numbers = set(dest for dest in destinations if regex.match(dest))
foreign_numbers = set()
if SMSResource.ALL not in self.authorized:
regexes = []
for country in self.authorized:
regexes += number_regexes[country]
regex = re.compile('|'.join(regexes))
foreign_numbers = set(dest for dest in destinations if not regex.match(dest))
authorized_numbers = sorted(set(destinations) - foreign_numbers - premium_numbers, key=int)
premium_numbers_string = ", ".join(sorted(premium_numbers, key=int))
foreign_numbers_string = ", ".join(sorted(foreign_numbers - premium_numbers, key=int))
if premium_numbers_string:
logging.warning('unauthorized premium rate phone number: %s', premium_numbers_string)
if foreign_numbers_string:
logging.warning('unauthorized foreign phone number: %s', foreign_numbers_string)
if len(authorized_numbers) == 0:
raise APIError('no phone number was authorized: %s' % ', '.join(destinations))
warnings = {
'deny premium rate phone numbers': premium_numbers_string,
'deny foreign phone numbers': foreign_numbers_string,
}
return authorized_numbers, warnings
@endpoint(
perm='can_send_messages',
methods=['post'],
@ -103,6 +183,7 @@ class SMSResource(BaseResource):
def send(self, request, post_data, nostop=None):
post_data['message'] = post_data['message'][: self.max_message_length]
post_data['to'] = self.clean_numbers(post_data['to'])
post_data['to'], warnings = self.authorize_numbers(post_data['to'])
logging.info('sending SMS to %r from %r', post_data['to'], post_data['from'])
stop = nostop is None # ?nostop in not in query string
self.add_job(
@ -112,7 +193,7 @@ class SMSResource(BaseResource):
destinations=post_data['to'],
stop=stop,
)
return {'err': 0}
return {'err': 0, 'warn': warnings}
def send_job(self, *args, **kwargs):
self.send_msg(**kwargs)

View File

@ -29,6 +29,7 @@ class SmsTestSendView(GenericConnectorMixin, FormView):
connector = self.get_object()
try:
number = connector.clean_numbers([number])[0]
number = connector.authorize_numbers([number])[0][0]
connector.send_msg(text=message, sender=sender, destinations=[number], stop=False)
except APIError as exc:
messages.error(self.request, _('Sending SMS fails: %s' % exc))

View File

@ -1,5 +1,21 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import isodate
import json
import logging
import mock
import pytest
from requests import RequestException
@ -7,9 +23,11 @@ from requests import RequestException
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from django.utils.translation import ugettext as _
from passerelle.apps.choosit.models import ChoositSMSGateway
from passerelle.apps.ovh.models import OVHSMSGateway
from passerelle.base.models import ApiUser, AccessRight, Job
from passerelle.base.models import ApiUser, AccessRight, Job, ResourceLog
from passerelle.sms.models import SMSResource, SMSLog
from passerelle.utils.jsonresponse import APIError
@ -37,6 +55,70 @@ def test_clean_numbers():
connector.clean_numbers(['0123'])
def test_authorize_numbers():
connector = OVHSMSGateway()
# premium-rate
assert connector.allow_premium_rate == False
number = '0033' + '8' + '12345678'
with pytest.raises(APIError, match='no phone number was authorized: %s' % number):
connector.authorize_numbers([number])
connector.allow_premium_rate = True
connector.save()
assert connector.authorize_numbers([number])[0] == [number]
# All country
assert connector.authorized == [SMSResource.ALL]
number = '0033' + '1' + '12345678'
assert connector.authorize_numbers([number])[0] == [number]
connector.authorized = [SMSResource.FR_METRO]
connector.save()
with pytest.raises(APIError, match='no phone number was authorized: %s' % number):
connector.authorize_numbers([number])
# France
number = '0033' + '6' + '12345678'
assert connector.authorize_numbers([number])[0] == [number]
connector.authorized = [SMSResource.FR_DOMTOM]
connector.save()
with pytest.raises(APIError, match='no phone number was authorized: %s' % number):
connector.authorize_numbers([number])
# Dom-Tom
number = '596596' + '123456'
assert connector.authorize_numbers([number])[0] == [number]
connector.authorized = [SMSResource.BE_]
connector.save()
with pytest.raises(APIError, match='no phone number was authorized: %s' % number):
connector.authorize_numbers([number])
# Belgian
number = '0032' + '45' + '1234567'
assert connector.authorize_numbers([number])[0] == [number]
connector.authorized = [SMSResource.FR_METRO]
connector.save()
with pytest.raises(APIError, match='no phone number was authorized: %s' % number):
connector.authorize_numbers([number])
# Don't raise if authorized destinations are not empty
connector.allow_premium_rate = False
connector.authorized = [SMSResource.FR_METRO]
connector.save()
numbers = [
'0033' + '8' + '12345678',
'0033' + '1' + '12345678',
'0033' + '6' + '12345678',
'596596' + '123456',
'0032' + '45' + '1234567',
]
authorized_numbers, warnings = connector.authorize_numbers(numbers)
assert authorized_numbers == ['0033612345678']
assert warnings == {
'deny premium rate phone numbers': '0033812345678',
'deny foreign phone numbers': '0032451234567, 0033112345678, 596596123456',
}
@pytest.fixture(params=klasses)
def connector(request, db):
klass = request.param
@ -420,3 +502,69 @@ def test_ovh_token_request(admin_user, app):
assert 'Successfuly completed connector configuration' in resp.text
connector.refresh_from_db()
assert connector.consumer_key == 'xyz'
@pytest.mark.parametrize('connector', [ChoositSMSGateway], indirect=True)
def test_manager(admin_user, app, connector):
app = login(app)
path = '/%s/%s/' % (connector.get_connector_slug(), connector.slug)
resp = app.get(path)
assert (
'33'
in [
x.text
for x in resp.html.find('div', {'id': 'description'}).find_all('p')
if x.text.startswith(_('Default country code'))
][0]
)
assert (
_('All')
in [x.text for x in resp.html.find_all('p') if x.text.startswith(_('Authorized Countries'))][0]
)
assert (
_('no')
in [x.text for x in resp.html.find_all('p') if x.text.startswith(_('Allow premium rate numbers'))][0]
)
path = '/manage/%s/%s/edit' % (connector.get_connector_slug(), connector.slug)
resp = app.get(path)
resp.form['authorized'] = []
resp = resp.form.submit()
assert resp.html.find('div', {'class': 'errornotice'}).p.text == 'There were errors processing your form.'
assert resp.html.find('div', {'class': 'error'}).text.strip() == 'This field is required.'
resp.form['authorized'] = [SMSResource.FR_METRO, SMSResource.FR_DOMTOM]
resp = resp.form.submit()
resp = resp.follow()
assert (
_('France mainland (+33 [67])')
in [x.text for x in resp.html.find_all('p') if x.text.startswith(_('Authorized Countries'))][0]
)
path = '/%s/%s/send/' % (connector.get_connector_slug(), connector.slug)
payload = {
'message': 'plop',
'from': '+33699999999',
'to': ['+33688888888'],
}
resp = app.post_json(path, params=payload)
assert resp.json['warn'] == {
'deny premium rate phone numbers': '',
'deny foreign phone numbers': '',
}
with mock.patch.object(type(connector), 'send_msg') as send_function:
send_function.return_value = {}
connector.jobs()
assert SMSLog.objects.count() == 1
payload['to'][0] = '+33188888888'
SMSLog.objects.all().delete()
app.post_json(path, params=payload)
with mock.patch.object(type(connector), 'send_msg') as send_function:
send_function.return_value = {}
connector.jobs()
assert not SMSLog.objects.count()
assert ResourceLog.objects.filter(levelno=logging.WARNING).count() == 1
assert (
ResourceLog.objects.filter(levelno=30)[0].extra['exception']
== 'no phone number was authorized: 0033188888888'
)