hobo/hobo/contrib/ozwillo/scripts/synchronize_ozwillo_users.py

149 lines
5.6 KiB
Python

# Ozwillo plugin to deploy Publik
# Copyright (C) 2017-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import datetime
import logging
from urllib import urlencode
import pprint
import os
import requests
from robobrowser.browser import RoboBrowser
from django.utils.six.moves.urllib import parse as urlparse
from hobo.multitenant.middleware import TenantMiddleware
from tenant_schemas.utils import tenant_context
def run_on_all_tenants(f):
for tenant in TenantMiddleware.get_tenants():
with tenant_context(tenant):
try:
f(tenant)
except:
logging.exception('unable to provision')
def provision_users(tenant):
from django.conf import settings
if not getattr(settings, 'OZWILLO_ADMIN', None):
logging.warning('No OZWILLO_ADMIN setting found')
return
logger = logging.getLogger('ozwillo_synchro')
from authentic2_auth_oidc.models import OIDCProvider
for provider in OIDCProvider.objects.all():
if 'ozwillo' in provider.issuer or 'accounts.sictiam.fr' in provider.issuer:
auth_url = provider.authorization_endpoint
token_url = provider.token_endpoint
client_id = provider.client_id
client_secret = provider.client_secret
redirect_uri = urlparse.urljoin(tenant.get_base_url(), '/accounts/oidc/callback/')
instance_users_url = urlparse.urljoin(token_url.replace('accounts', 'kernel'), '/apps/acl/instance/')
break
else:
logger.warning('No ozwillo OIDC provider found for %s', tenant.get_base_url())
return
session = requests.Session()
session.verify = False
br = RoboBrowser(user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:45.0) Gecko/20100101 Firefox/45.0',
session=session)
query = urlencode({
'client_id': client_id,
'response_type': 'code',
'scope': 'openid offline_access',
'prompt': 'consent',
'redirect_uri': redirect_uri
})
response = br.open(auth_url + '?%s' % query)
response = br._states[-1].response
if br._states[-1].response.status_code != 200:
logger.warning(u'OIDC authorization request failed: %s %r', response.status_code, response._content[:1000])
return
referer = br.response.request.url
br.session.headers['Referer'] = referer
form = br.get_form(action='/a/login')
form['u'] = settings.OZWILLO_ADMIN[0]
form['pwd'] = settings.OZWILLO_ADMIN[1]
br.submit_form(form=form)
referer = br.response.request.url
br.session.headers['Referer'] = referer
form = br.get_form()
br.submit_form(form=form, allow_redirects=False)
if 'Location' not in br.response.headers:
logger.warning(u'Not authorization form %r %r', response.status_code, response._content[:1000])
return
cb_url = urlparse.urlparse(br.response.headers['location'])
code = urlparse.parse_qs(cb_url.query)['code'][0]
logger.info('Getting token from %s', token_url)
response = requests.post(token_url, auth=(client_id, client_secret),
data={
'code': code,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code'})
logger.info('Got %s', response.json())
access_token = response.json()['access_token']
logger.info('Getting instance users from %s', instance_users_url + client_id)
response = requests.get(instance_users_url + client_id,
headers={'Authorization': 'Bearer %s' % access_token})
logger.info('Got %s', response.json())
from django.contrib.auth import get_user_model
from authentic2_auth_oidc.models import OIDCAccount
User = get_user_model()
for user in response.json():
logging.info('Provisionning email %s with sub %s', user['user_email_address'], user['user_id'])
while True:
new_user = User.objects.create()
oidc_account, created = OIDCAccount.objects.select_related().get_or_create(provider=provider, sub=user['user_id'], defaults={'user': new_user})
if created:
if OIDCAccount.objects.filter(provider=provider, sub=user['user_id']).count() > 1:
oidc_account.delete()
new_user.delete()
continue
logging.info('provisionned with uuid %s', new_user.uuid)
break
else:
new_user.delete()
new_user = oidc_account.user
break
save = False
if new_user.username != user['user_name']:
new_user.username = user['user_name']
save = True
if new_user.email != user['user_email_address']:
new_user.email = user['user_email_address']
save = True
if new_user.ou != provider.ou:
new_user.ou = provider.ou
save = True
if save:
new_user.save()
run_on_all_tenants(provision_users)