313 lines
13 KiB
Python
313 lines
13 KiB
Python
# welco - multichannel request processing
|
|
# Copyright (C) 2018 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import json
|
|
import re
|
|
|
|
import pytest
|
|
from django.test import override_settings
|
|
from django.urls import reverse
|
|
from django.utils import six
|
|
from django.utils.encoding import force_text
|
|
from django.utils.timezone import now, timedelta
|
|
|
|
from welco.sources.phone import models
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def test_call_start_stop(client):
|
|
assert models.PhoneCall.objects.count() == 0
|
|
payload = {
|
|
'event': 'start',
|
|
'caller': '0033699999999',
|
|
'callee': '102',
|
|
'data': {
|
|
'user': 'boby.lapointe',
|
|
},
|
|
}
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
assert models.PhoneCall.objects.count() == 1
|
|
assert (
|
|
models.PhoneCall.objects.filter(
|
|
caller='0033699999999', callee='102', data=json.dumps(payload['data']), stop__isnull=True
|
|
).count()
|
|
== 1
|
|
)
|
|
# new start event
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
assert models.PhoneCall.objects.count() == 2
|
|
assert (
|
|
models.PhoneCall.objects.filter(
|
|
caller='0033699999999', callee='102', data=json.dumps(payload['data']), stop__isnull=True
|
|
).count()
|
|
== 1
|
|
)
|
|
# first call has been closed
|
|
assert (
|
|
models.PhoneCall.objects.filter(
|
|
caller='0033699999999', callee='102', data=json.dumps(payload['data']), stop__isnull=False
|
|
).count()
|
|
== 1
|
|
)
|
|
payload['event'] = 'stop'
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
assert models.PhoneCall.objects.count() == 2
|
|
assert (
|
|
models.PhoneCall.objects.filter(
|
|
caller='0033699999999', callee='102', data=json.dumps(payload['data']), stop__isnull=False
|
|
).count()
|
|
== 2
|
|
)
|
|
# stop is idempotent
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
assert models.PhoneCall.objects.count() == 2
|
|
assert (
|
|
models.PhoneCall.objects.filter(
|
|
caller='0033699999999', callee='102', data=json.dumps(payload['data']), stop__isnull=False
|
|
).count()
|
|
== 2
|
|
)
|
|
|
|
|
|
def test_one_call_per_callee(user, client):
|
|
assert models.PhoneCall.objects.count() == 0
|
|
payload = {'event': 'start', 'caller': '0033699999999', 'callee': '102'}
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert models.PhoneCall.objects.filter(callee='102', stop__isnull=True).count() == 1 # active
|
|
assert models.PhoneCall.objects.filter(callee='102', stop__isnull=False).count() == 0 # inactive
|
|
|
|
# new caller, same callee: stops the last call, start a new one
|
|
payload['caller'] = '00337123456789'
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert models.PhoneCall.objects.count() == 2
|
|
assert (
|
|
models.PhoneCall.objects.filter(caller='00337123456789', callee='102', stop__isnull=True).count() == 1
|
|
)
|
|
assert (
|
|
models.PhoneCall.objects.filter(caller='0033699999999', callee='102', stop__isnull=False).count() == 1
|
|
)
|
|
|
|
with override_settings(PHONE_ONE_CALL_PER_CALLEE=False):
|
|
# accept multiple call: start a new one, don't stop anything
|
|
payload['caller'] = '00221774261500'
|
|
response = client.post(
|
|
reverse('phone-call-event'), json.dumps(payload), content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
assert models.PhoneCall.objects.count() == 3
|
|
assert models.PhoneCall.objects.filter(callee='102', stop__isnull=True).count() == 2
|
|
assert models.PhoneCall.objects.filter(callee='102', stop__isnull=False).count() == 1
|
|
# same caller: stop his last call, add a new one
|
|
response = client.post(
|
|
reverse('phone-call-event'), json.dumps(payload), content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
assert models.PhoneCall.objects.count() == 4
|
|
assert models.PhoneCall.objects.filter(callee='102', stop__isnull=True).count() == 2
|
|
assert models.PhoneCall.objects.filter(callee='102', stop__isnull=False).count() == 2
|
|
|
|
|
|
def test_current_calls(user, client):
|
|
# create some calls
|
|
for number in range(0, 10):
|
|
payload = {
|
|
'event': 'start',
|
|
'caller': '00336999999%02d' % number,
|
|
'callee': '1%02d' % number,
|
|
'data': {
|
|
'user': 'boby.lapointe',
|
|
},
|
|
}
|
|
response = client.post(
|
|
reverse('phone-call-event'), json.dumps(payload), content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
|
|
# register user to some lines
|
|
# then remove from some
|
|
for number in range(0, 10):
|
|
models.PhoneLine.take(callee='1%02d' % number, user=user)
|
|
for number in range(5, 10):
|
|
models.PhoneLine.release(callee='1%02d' % number, user=user)
|
|
client.login(username='toto', password='toto')
|
|
response = client.get(reverse('phone-current-calls'))
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
payload = response.json()
|
|
assert isinstance(payload, dict)
|
|
assert set(payload.keys()) == {'err', 'data'}
|
|
assert payload['err'] == 0
|
|
data = payload['data']
|
|
assert set(data.keys()) == {'calls', 'lines', 'all-lines'}
|
|
assert isinstance(data['calls'], list)
|
|
assert isinstance(data['lines'], list)
|
|
assert isinstance(data['all-lines'], list)
|
|
assert len(data['calls']) == 5
|
|
assert len(data['lines']) == 5
|
|
assert len(data['all-lines']) == 10
|
|
for call in data['calls']:
|
|
assert set(call.keys()) <= {'caller', 'callee', 'start', 'data'}
|
|
assert isinstance(call['caller'], str)
|
|
assert isinstance(call['callee'], str)
|
|
assert isinstance(call['start'], str)
|
|
if 'data' in call:
|
|
assert isinstance(call['data'], dict)
|
|
assert len([call for call in data['lines'] if isinstance(call, str)]) == 5
|
|
assert len([call for call in data['all-lines'] if isinstance(call, str)]) == 10
|
|
|
|
# unregister user to all remaining lines
|
|
for number in range(0, 5):
|
|
models.PhoneLine.release(callee='1%02d' % number, user=user)
|
|
response = client.get(reverse('phone-current-calls'))
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
payload = response.json()
|
|
assert isinstance(payload, dict)
|
|
assert set(payload.keys()) == {'err', 'data'}
|
|
assert payload['err'] == 0
|
|
assert set(payload['data'].keys()) == {'calls', 'lines', 'all-lines'}
|
|
assert len(payload['data']['calls']) == 0
|
|
assert len(payload['data']['lines']) == 0
|
|
assert len(payload['data']['all-lines']) == 10
|
|
|
|
|
|
def test_take_release_line(user, client):
|
|
client.login(username='toto', password='toto')
|
|
|
|
assert models.PhoneLine.objects.count() == 0
|
|
payload = {
|
|
'callee': '102',
|
|
}
|
|
response = client.post(reverse('phone-take-line'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
assert models.PhoneLine.objects.count() == 1
|
|
assert models.PhoneLine.objects.filter(users=user, callee='102').count() == 1
|
|
response = client.post(
|
|
reverse('phone-release-line'), json.dumps(payload), content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
assert response['content-type'] == 'application/json'
|
|
assert response.json() == {'err': 0}
|
|
assert models.PhoneLine.objects.count() == 1
|
|
assert models.PhoneLine.objects.filter(users=user, callee='102').count() == 0
|
|
|
|
|
|
def test_phone_zone(user, client):
|
|
client.login(username='toto', password='toto')
|
|
response = client.get(reverse('phone-zone'))
|
|
assert response.status_code == 200
|
|
assert 'You do not have a phoneline configured' in force_text(response.content)
|
|
|
|
models.PhoneLine.take(callee='102', user=user)
|
|
|
|
response = client.get(reverse('phone-zone'))
|
|
assert response.status_code == 200
|
|
assert 'You do not have a phoneline configured' not in force_text(response.content)
|
|
assert '<li>102' in force_text(response.content)
|
|
assert 'data-callee="102"' in force_text(response.content)
|
|
currents = re.search(
|
|
'<div id="source-mainarea" ' 'data-current-calls="/api/phone/current-calls/">' '(.*?)</div>',
|
|
force_text(response.content),
|
|
flags=re.DOTALL,
|
|
)
|
|
assert currents.group(1).strip() == ''
|
|
|
|
# create a call
|
|
payload = {'event': 'start', 'caller': '003369999999', 'callee': '102'}
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
response = client.get(reverse('phone-zone'))
|
|
assert response.status_code == 200
|
|
assert '<h1>Current Call: <strong>003369999999</strong></h1>' in force_text(response.content)
|
|
|
|
# simulate a mellon user
|
|
session = client.session
|
|
session['mellon_session'] = {'username': ['agent007@ldap']}
|
|
session.save()
|
|
response = client.get(reverse('phone-zone'))
|
|
assert response.status_code == 200
|
|
assert 'agent007' not in force_text(response.content)
|
|
assert 'data-callee="agent007"' not in force_text(response.content)
|
|
assert '<li>102' in force_text(response.content)
|
|
assert 'data-callee="102"' in force_text(response.content)
|
|
|
|
with override_settings(PHONE_AUTOTAKE_MELLON_USERNAME=True):
|
|
response = client.get(reverse('phone-zone'))
|
|
assert response.status_code == 200
|
|
assert '<h1>Current Call: <strong>003369999999</strong></h1>' in force_text(response.content)
|
|
assert 'agent007' in force_text(response.content)
|
|
assert 'data-callee="agent007"' in force_text(response.content)
|
|
assert '<li>102' in force_text(response.content)
|
|
assert 'data-callee="102"' in force_text(response.content)
|
|
|
|
|
|
def test_call_expiration(user, client):
|
|
assert models.PhoneCall.objects.count() == 0
|
|
# create a call
|
|
payload = {'event': 'start', 'caller': '003369999999', 'callee': '102'}
|
|
response = client.post(reverse('phone-call-event'), json.dumps(payload), content_type='application/json')
|
|
assert response.status_code == 200
|
|
assert models.PhoneCall.objects.filter(stop__isnull=True).count() == 1
|
|
|
|
# get list of calls for line 102
|
|
models.PhoneLine.take(callee='102', user=user)
|
|
client.login(username='toto', password='toto')
|
|
response = client.get(reverse('phone-current-calls'))
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert payload['err'] == 0
|
|
assert len(payload['data']['calls']) == 1
|
|
|
|
# start call 10 minutes ago
|
|
models.PhoneCall.objects.filter(stop__isnull=True).update(start=now() - timedelta(minutes=10))
|
|
|
|
# get list of calls without expiration
|
|
response = client.get(reverse('phone-current-calls'))
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert payload['err'] == 0
|
|
assert len(payload['data']['calls']) == 1 # still here
|
|
|
|
# get list of calls with an expiration of 2 minutes (< 10 minutes)
|
|
with override_settings(PHONE_MAX_CALL_DURATION=2):
|
|
response = client.get(reverse('phone-current-calls'))
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert payload['err'] == 0
|
|
assert len(payload['data']['calls']) == 0 # call is expired
|
|
|
|
assert models.PhoneCall.objects.filter(stop__isnull=True).count() == 0 # active calls
|
|
assert models.PhoneCall.objects.filter(stop__isnull=False).count() == 1 # stopped calls
|