authentic/tests/test_idp_cas.py

540 lines
26 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
from django.contrib.auth import get_user_model
from django.test.client import Client, RequestFactory
from django.test.utils import override_settings
from django.utils.encoding import force_str
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.constants import AUTHENTICATION_EVENTS_SESSION_KEY, NONCE_FIELD_NAME
from authentic2_idp_cas import constants
from authentic2_idp_cas.models import Attribute, Service, Ticket
from .utils import Authentic2TestCase, assert_event
CAS_NAMESPACES = {
'cas': constants.CAS_NAMESPACE,
}
User = get_user_model()
@override_settings(A2_IDP_CAS_ENABLE=True)
class CasTests(Authentic2TestCase):
LOGIN = 'test'
PASSWORD = 'test'
EMAIL = 'test@example.com'
FIRST_NAME = 'John'
LAST_NAME = 'Doe'
NAME = 'CAS service'
SLUG = 'cas-service'
URL = 'https://casclient.com/%C3%A9/'
NAME2 = 'CAS service2'
SLUG2 = 'cas-service2'
URL2 = 'https://casclient2.com/ https://other.com/'
SERVICE2_URL = 'https://casclient2.com/service/'
PGT_URL = 'https://casclient.con/pgt/'
def setUp(self):
self.user = User.objects.create_user(
self.LOGIN,
password=self.PASSWORD,
email=self.EMAIL,
first_name=self.FIRST_NAME,
last_name=self.LAST_NAME,
)
self.service = Service.objects.create(
name=self.NAME,
slug=self.SLUG,
urls=self.URL,
identifier_attribute='django_user_username',
ou=get_default_ou(),
logout_url=self.URL + 'logout/',
)
self.service_attribute1 = Attribute.objects.create(
service=self.service, slug='email', attribute_name='django_user_email'
)
self.service2 = Service.objects.create(
name=self.NAME2,
slug=self.SLUG2,
urls=self.URL2,
ou=get_default_ou(),
identifier_attribute='django_user_email',
)
self.service2_attribute1 = Attribute.objects.create(
service=self.service2, slug='username', attribute_name='django_user_username'
)
self.authorized_service = Role.objects.create(name='rogue', ou=get_default_ou())
self.factory = RequestFactory()
def test_long_urls(self):
self.service.urls = (
'https://casclient.com/%C3%A9/lorem/ipsum/dolor/sit/amet/consectetur/adipiscing/elit/sed/do/eiusm'
'od/tempor/incididunt/ut/labore/et/dolore/magna/aliqua/ut/enim/ad/minim/veniam/quis/nostrud/exerc'
'itation/ullamco/laboris/nisi/ut/aliquip/ex/ea/commodo/consequat/duis/aute/irure/dolor/in/reprehe'
'nderit/in/voluptate/velit/esse/cillum/dolore/eu/fugiat/nulla/pariatur/excepteur/sint/occaecat/cu'
'pidatat/non/proident/sunt/in/culpa/qui/officia/deserunt/mollit/anim/id/est/laborum'
)
self.service.save()
def test_service_matching(self):
self.service.clean()
self.service2.clean()
self.assertEqual(Service.objects.for_service(self.URL), self.service)
for service in self.URL2.split():
self.assertEqual(Service.objects.for_service(service), self.service2)
self.assertEqual(Service.objects.for_service('http://google.com'), None)
def test_login_failure(self):
client = Client()
response = client.get('/idp/cas/login')
self.assertEqual(response.status_code, 400)
self.assertIn('no service', force_str(response.content))
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: 'http://google.com/'})
self.assertRedirectsComplex(response, 'http://google.com/')
response = client.get(
'/idp/cas/login',
{constants.SERVICE_PARAM: self.URL, constants.RENEW_PARAM: '', constants.GATEWAY_PARAM: ''},
)
self.assertRedirectsComplex(response, self.URL)
response = client.get(
'/idp/cas/login', {constants.SERVICE_PARAM: self.URL, constants.GATEWAY_PARAM: ''}
)
self.assertRedirectsComplex(response, self.URL)
def test_role_access_control_denied(self):
client = Client()
service = self.service
service.add_authorized_role(self.authorized_service)
service.unauthorized_url = 'https://casclient.com/loser/'
service.save()
assert service.authorized_roles.exists() is True
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
location = response['Location']
query = urllib.parse.parse_qs(location.split('?')[1])
dummy_next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
response = client.get(location)
response = client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
response = client.get(response.url)
assert_event(
'user.service.sso.denial',
session=client.session,
user=self.user,
service=self.service,
)
self.assertIn('https://casclient.com/loser/', force_str(response.content))
def test_role_access_control_granted(self):
client = Client()
service = self.service
service.add_authorized_role(self.authorized_service)
User.objects.get(username=self.LOGIN).roles.add(self.authorized_service)
assert service.authorized_roles.exists() is True
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
location = response['Location']
query = urllib.parse.parse_qs(location.split('?')[1])
dummy_next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
response = client.get(location)
response = client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
response = client.get(response.url)
client = Client()
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/validate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
def test_login_validate(self):
response = self.client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
self.assertEqual(response.status_code, 302)
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), {constants.SERVICE_PARAM, NONCE_FIELD_NAME})
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
self.assertEqual(next_url_query[NONCE_FIELD_NAME], [ticket.ticket_id])
response = self.client.get(location)
response = self.client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
self.assertIn(AUTHENTICATION_EVENTS_SESSION_KEY, self.client.session)
self.assertIn('nonce', self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0])
self.assertIn(ticket.ticket_id, self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0]['nonce'])
self.assertRedirectsComplex(response, query['next'][0], nonce=ticket.ticket_id)
response = self.client.get(response.url)
self.assertRedirectsComplex(response, self.URL, ticket=ticket.ticket_id)
# Check logout state has been updated
ticket = Ticket.objects.get()
self.assertIn(constants.SESSION_CAS_LOGOUTS, self.client.session)
self.assertEqual(
self.client.session[constants.SESSION_CAS_LOGOUTS],
[
[
ticket.service.name,
ticket.service.logout_url,
ticket.service.logout_use_iframe,
ticket.service.logout_use_iframe_timeout,
]
],
)
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/validate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'text/plain')
self.assertEqual(force_str(response.content), 'yes\n%s\n' % self.LOGIN)
# Verify ticket has been deleted
with self.assertRaises(Ticket.DoesNotExist):
Ticket.objects.get()
def test_login_service_validate(self):
response = self.client.get('/idp/cas/login/', {constants.SERVICE_PARAM: self.URL})
self.assertEqual(response.status_code, 302)
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), {constants.SERVICE_PARAM, NONCE_FIELD_NAME})
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
self.assertEqual(next_url_query[NONCE_FIELD_NAME], [ticket.ticket_id])
response = self.client.get(location)
response = self.client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
self.assertIn(AUTHENTICATION_EVENTS_SESSION_KEY, self.client.session)
self.assertIn('nonce', self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0])
self.assertIn(ticket.ticket_id, self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0]['nonce'])
self.assertRedirectsComplex(response, query['next'][0], nonce=ticket.ticket_id)
response = self.client.get(response.url)
self.assertRedirectsComplex(response, self.URL, ticket=ticket.ticket_id)
# Check logout state has been updated
ticket = Ticket.objects.get()
self.assertIn(constants.SESSION_CAS_LOGOUTS, self.client.session)
self.assertEqual(
self.client.session[constants.SESSION_CAS_LOGOUTS],
[
[
ticket.service.name,
ticket.service.logout_url,
ticket.service.logout_use_iframe,
ticket.service.logout_use_iframe_timeout,
]
],
)
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/serviceValidate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'text/xml')
constraints = (
('/cas:serviceResponse/cas:authenticationSuccess/cas:user', self.LOGIN),
('/cas:serviceResponse/cas:authenticationSuccess/cas:attributes/cas:email', self.EMAIL),
)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Verify ticket has been deleted
with self.assertRaises(Ticket.DoesNotExist):
Ticket.objects.get()
def test_login_service_validate_without_renew_failure(self):
response = self.client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
self.assertEqual(response.status_code, 302)
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), {constants.SERVICE_PARAM, NONCE_FIELD_NAME})
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
self.assertEqual(next_url_query[NONCE_FIELD_NAME], [ticket.ticket_id])
response = self.client.get(location)
response = self.client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
self.assertIn(AUTHENTICATION_EVENTS_SESSION_KEY, self.client.session)
self.assertIn('nonce', self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0])
self.assertIn(ticket.ticket_id, self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0]['nonce'])
self.assertRedirectsComplex(response, query['next'][0], nonce=ticket.ticket_id)
response = self.client.get(response.url)
self.assertRedirectsComplex(response, self.URL, ticket=ticket.ticket_id)
# Check logout state has been updated
ticket = Ticket.objects.get()
self.assertIn(constants.SESSION_CAS_LOGOUTS, self.client.session)
self.assertEqual(
self.client.session[constants.SESSION_CAS_LOGOUTS],
[
[
ticket.service.name,
ticket.service.logout_url,
ticket.service.logout_use_iframe,
ticket.service.logout_use_iframe_timeout,
]
],
)
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/serviceValidate',
{constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL, constants.RENEW_PARAM: ''},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'text/xml')
constraints = (('/cas:serviceResponse/cas:authenticationFailure/@code', 'INVALID_TICKET_SPEC'),)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Verify ticket has been deleted
with self.assertRaises(Ticket.DoesNotExist):
Ticket.objects.get()
def test_login_proxy_validate_on_service_ticket(self):
response = self.client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
self.assertEqual(response.status_code, 302)
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), {constants.SERVICE_PARAM, NONCE_FIELD_NAME})
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
self.assertEqual(next_url_query[NONCE_FIELD_NAME], [ticket.ticket_id])
response = self.client.get(location)
response = self.client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
self.assertIn(AUTHENTICATION_EVENTS_SESSION_KEY, self.client.session)
self.assertIn('nonce', self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0])
self.assertIn(ticket.ticket_id, self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0]['nonce'])
self.assertRedirectsComplex(response, query['next'][0], nonce=ticket.ticket_id)
response = self.client.get(response.url)
self.assertRedirectsComplex(response, self.URL, ticket=ticket.ticket_id)
# Check logout state has been updated
ticket = Ticket.objects.get()
self.assertIn(constants.SESSION_CAS_LOGOUTS, self.client.session)
self.assertEqual(
self.client.session[constants.SESSION_CAS_LOGOUTS],
[
[
ticket.service.name,
ticket.service.logout_url,
ticket.service.logout_use_iframe,
ticket.service.logout_use_iframe_timeout,
]
],
)
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/proxyValidate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'text/xml')
constraints = (
('/cas:serviceResponse/cas:authenticationSuccess/cas:user', self.LOGIN),
('/cas:serviceResponse/cas:authenticationSuccess/cas:attributes/cas:email', self.EMAIL),
)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Verify ticket has been deleted
with self.assertRaises(Ticket.DoesNotExist):
Ticket.objects.get()
@override_settings(A2_IDP_CAS_CHECK_PGT_URL=False)
def test_proxy(self):
response = self.client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
self.assertEqual(response.status_code, 302)
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), {constants.SERVICE_PARAM, NONCE_FIELD_NAME})
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
self.assertEqual(next_url_query[NONCE_FIELD_NAME], [ticket.ticket_id])
response = self.client.get(location)
response = self.client.post(
location,
{'login-password-submit': '', 'username': self.LOGIN, 'password': self.PASSWORD},
follow=False,
)
self.assertIn(AUTHENTICATION_EVENTS_SESSION_KEY, self.client.session)
self.assertIn('nonce', self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0])
self.assertIn(ticket.ticket_id, self.client.session[AUTHENTICATION_EVENTS_SESSION_KEY][0]['nonce'])
self.assertRedirectsComplex(response, query['next'][0], nonce=ticket.ticket_id)
response = self.client.get(response.url)
self.assertRedirectsComplex(response, self.URL, ticket=ticket.ticket_id)
# Check logout state has been updated
ticket = Ticket.objects.get()
self.assertIn(constants.SESSION_CAS_LOGOUTS, self.client.session)
self.assertEqual(
self.client.session[constants.SESSION_CAS_LOGOUTS],
[
[
ticket.service.name,
ticket.service.logout_url,
ticket.service.logout_use_iframe,
ticket.service.logout_use_iframe_timeout,
]
],
)
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/serviceValidate',
{
constants.TICKET_PARAM: ticket_id,
constants.SERVICE_PARAM: self.URL,
constants.PGT_URL_PARAM: self.PGT_URL,
},
)
for key in client.session.keys():
if key.startswith(constants.PGT_IOU_PREFIX):
pgt_iou = key
pgt = client.session[key]
break
else:
# pylint: disable=redundant-unittest-assert
self.assertTrue(False, 'PGTIOU- not found in session')
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'text/xml')
constraints = (
('/cas:serviceResponse/cas:authenticationSuccess/cas:user', self.LOGIN),
('/cas:serviceResponse/cas:authenticationSuccess/cas:proxyGrantingTicket', pgt_iou),
)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Verify service ticket has been deleted
with self.assertRaises(Ticket.DoesNotExist):
Ticket.objects.get(ticket_id=ticket_id)
# Verify pgt ticket exists
pgt_ticket = Ticket.objects.get(ticket_id=pgt)
self.assertEqual(pgt_ticket.user, self.user)
self.assertIsNone(pgt_ticket.expire)
self.assertEqual(pgt_ticket.service, self.service)
self.assertEqual(pgt_ticket.service_url, self.URL)
self.assertEqual(pgt_ticket.proxies, self.PGT_URL)
# Try to get a proxy ticket for service 2
# it should fail since no proxy authorization exists
client = Client()
response = client.get(
'/idp/cas/proxy', {constants.PGT_PARAM: pgt, constants.TARGET_SERVICE_PARAM: self.SERVICE2_URL}
)
constraints = (('/cas:serviceResponse/cas:proxyFailure/@code', 'PROXY_UNAUTHORIZED'),)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Set proxy authorization
self.service2.proxy.add(self.service)
# Try again !
response = client.get(
'/idp/cas/proxy', {constants.PGT_PARAM: pgt, constants.TARGET_SERVICE_PARAM: self.SERVICE2_URL}
)
pt = Ticket.objects.get(ticket_id__startswith=constants.PT_PREFIX)
self.assertEqual(pt.user, self.user)
self.assertIsNotNone(pt.expire)
self.assertEqual(pt.service, self.service2)
self.assertEqual(pt.service_url, self.SERVICE2_URL)
self.assertEqual(pt.proxies, self.PGT_URL)
constraints = (('/cas:serviceResponse/cas:proxySuccess/cas:proxyTicket', pt.ticket_id),)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Now service2 try to resolve the proxy ticket
client = Client()
response = client.get(
'/idp/cas/proxyValidate',
{constants.TICKET_PARAM: pt.ticket_id, constants.SERVICE_PARAM: self.SERVICE2_URL},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'text/xml')
constraints = (
('/cas:serviceResponse/cas:authenticationSuccess/cas:user', self.EMAIL),
('/cas:serviceResponse/cas:authenticationSuccess/cas:attributes/cas:username', self.LOGIN),
)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)
# Verify ticket has been deleted
with self.assertRaises(Ticket.DoesNotExist):
Ticket.objects.get(ticket_id=pt.ticket_id)
# Check invalidation of PGT when session is closed
self.client.logout()
response = client.get(
'/idp/cas/proxy', {constants.PGT_PARAM: pgt, constants.TARGET_SERVICE_PARAM: self.SERVICE2_URL}
)
constraints = (
('/cas:serviceResponse/cas:proxyFailure', 'session has expired'),
('/cas:serviceResponse/cas:proxyFailure/@code', 'BAD_PGT'),
)
self.assertXPathConstraints(response, constraints, CAS_NAMESPACES)