This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
ckanext-ozwillo-pyoidc/ckanext/ozwillo_pyoidc/plugin.py

140 lines
5.1 KiB
Python
Executable File

import logging
import conf
import ckan.plugins as plugins
import ckan.plugins.toolkit as toolkit
from ckan.common import session, c, request
from ckan import model
import ckan.lib.base as base
from pylons import config, request
import conf
from oidc import create_client
plugin_config_prefix = 'ckanext.ozwillo_pyoidc.'
log = logging.getLogger(__name__)
plugin_controller = 'ckanext.ozwillo_pyoidc.plugin:OpenidController'
CLIENT = None
class OzwilloPyoidcPlugin(plugins.SingletonPlugin):
plugins.implements(plugins.IConfigurer)
plugins.implements(plugins.IRoutes)
plugins.implements(plugins.IAuthenticator, inherit=True)
def before_map(self, map):
map.connect('/organization/{id:.*}/sso',
controller=plugin_controller,
action='sso')
map.connect('/organization/{id:.*}/callback',
controller=plugin_controller,
action='callback')
map.connect('/user/slo',
controller=plugin_controller,
action='slo')
map.redirect('/organization/{id:.*}/logout', '/user/_logout')
return map
def after_map(self, map):
return map
def identify(self):
user = session.get('user')
if user and not toolkit.c.userobj:
userobj = model.User.get(user)
toolkit.c.user = userobj.name
toolkit.c.userobj = userobj
def login(self):
global CLIENT
if 'organization_id' in session:
g = model.Group.get(session['organization_id'])
conf.CLIENT['client_registration'].update({
'client_id': g._extras['client_id'].value,
'client_secret': g._extras['client_secret'].value,
'redirect_uris': [toolkit.url_for(host=request.host,
controller=plugin_controller,
action='callback',
id=g.name,
qualified=True)]
})
log.info('registration info for organization "%s" set' % g.name)
CLIENT = create_client(**conf.CLIENT)
url, ht_args = CLIENT.create_authn_request(session, conf.ACR_VALUES)
if ht_args:
toolkit.request.headers.update(ht_args)
toolkit.redirect_to(url)
else:
toolkit.redirect_to('/')
def logout(self):
pass
def update_config(self, config_):
toolkit.add_template_directory(config_, 'templates')
toolkit.add_public_directory(config_, 'public')
toolkit.add_resource('fanstatic', 'ozwillo_pyoidc')
class OpenidController(base.BaseController):
def sso(self, id):
log.info('SSO for organization "%s"' % id)
session['organization_id'] = id
session.save()
log.info('redirecting to login page')
login_url = toolkit.url_for(host=request.host,
controller='user',
action='login',
qualified=True)
toolkit.redirect_to(login_url)
def callback(self):
global CLIENT
if CLIENT:
userinfo = CLIENT.callback(request.GET)
log.info('Received userinfo: %s' % userinfo)
userobj = model.User.get(userinfo['nickname'])
if userobj:
userobj.email = userinfo['email']
if 'given_name' in userinfo:
userobj.fullname = userinfo['given_name']
if 'family_name' in userinfo:
userobj.fullname += userinfo['family_name']
userobj.save()
session['user'] = userobj.id
session.save()
org_url = toolkit.url_for(host=request.host,
controller="organization",
action='read',
id=session['organization_id'],
qualified=True)
toolkit.redirect_to(org_url)
def slo(self):
"""
Revokes the delivered access token. Logs out the user
"""
global CLIENT
logout_url = CLIENT.end_session_endpoint
org_url = toolkit.url_for(host=request.host,
controller='organization',
action='read',
id=session['organization_id'],
qualified=True)
redirect_uri = org_url + '/logout'
# revoke the access token
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = 'token=%s&token_type_hint=access_token' % CLIENT.access_token
CLIENT.http_request(CLIENT.revocation_endpoint, 'POST',
data=data, headers=headers)
# redirect to IDP logout
logout_url += '?id_token_hint=%s&' % CLIENT.id_token
logout_url += 'post_logout_redirect_uri=%s' % redirect_uri
toolkit.redirect_to(str(logout_url))